This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 68eb6b2e0bf960afefc5f3bc2dd4127d4e4fecd3
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 30 14:34:30 2026 +0800

    [core][spark] Introduce Full Text Search interfaces and Spark function 
(#7548)
    
    Symmetric implementation of full text search based on the existing
    VectorSearch pattern.
---
 .../paimon/globalindex/GlobalIndexReader.java      |   5 +
 .../globalindex/OffsetGlobalIndexReader.java       |   6 +
 .../apache/paimon/predicate/FullTextSearch.java    |  72 ++++
 .../TestFullTextGlobalIndexReader.java             | 254 ++++++++++++
 .../TestFullTextGlobalIndexWriter.java             | 110 ++++++
 .../testfulltext/TestFullTextGlobalIndexer.java    |  62 +++
 .../TestFullTextGlobalIndexerFactory.java          |  37 +-
 ....apache.paimon.globalindex.GlobalIndexerFactory |   1 +
 .../java/org/apache/paimon/table/FormatTable.java  |   7 +
 .../apache/paimon/table/FullTextSearchTable.java   | 101 +++++
 .../java/org/apache/paimon/table/InnerTable.java   |   7 +
 .../main/java/org/apache/paimon/table/Table.java   |   4 +
 .../apache/paimon/table/source/FullTextRead.java   |  27 +-
 .../paimon/table/source/FullTextReadImpl.java      | 127 ++++++
 .../apache/paimon/table/source/FullTextScan.java   |  26 +-
 .../paimon/table/source/FullTextScanImpl.java      |  88 +++++
 .../paimon/table/source/FullTextSearchBuilder.java |  47 +++
 .../table/source/FullTextSearchBuilderImpl.java    |  74 ++++
 .../paimon/table/source/FullTextSearchSplit.java   |  82 ++++
 .../table/source/FullTextSearchBuilderTest.java    | 424 +++++++++++++++++++++
 .../paimon/spark/PaimonBaseScanBuilder.scala       |   2 +
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  28 +-
 .../paimon/spark/PaimonBaseScanBuilder.scala       |   2 +
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  11 +-
 .../plans/logical/PaimonTableValuedFunctions.scala |  89 ++++-
 .../org/apache/paimon/spark/read/BaseScan.scala    |   6 +-
 .../paimon/spark/sql/FullTextSearchTest.scala      | 223 +++++++++++
 30 files changed, 1869 insertions(+), 62 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
index 1b8c17bf66..f639273575 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.globalindex;
 
+import org.apache.paimon.predicate.FullTextSearch;
 import org.apache.paimon.predicate.FunctionVisitor;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.VectorSearch;
@@ -47,4 +48,8 @@ public interface GlobalIndexReader extends 
FunctionVisitor<Optional<GlobalIndexR
     default Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch 
vectorSearch) {
         throw new UnsupportedOperationException();
     }
+
+    default Optional<ScoredGlobalIndexResult> 
visitFullTextSearch(FullTextSearch fullTextSearch) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
index 21f06f59f3..0122060dfb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.globalindex;
 
 import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FullTextSearch;
 import org.apache.paimon.predicate.VectorSearch;
 
 import java.io.IOException;
@@ -122,6 +123,11 @@ public class OffsetGlobalIndexReader implements 
GlobalIndexReader {
                 .map(r -> r.offset(offset));
     }
 
+    @Override
+    public Optional<ScoredGlobalIndexResult> 
visitFullTextSearch(FullTextSearch fullTextSearch) {
+        return wrapped.visitFullTextSearch(fullTextSearch).map(r -> 
r.offset(offset));
+    }
+
     private Optional<GlobalIndexResult> 
applyOffset(Optional<GlobalIndexResult> result) {
         return result.map(r -> r.offset(offset));
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java
new file mode 100644
index 0000000000..5b91d0bbc6
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/** FullTextSearch to perform full-text search on a text column. */
+public class FullTextSearch implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String queryText;
+    private final String fieldName;
+    private final int limit;
+
+    public FullTextSearch(String queryText, int limit, String fieldName) {
+        if (queryText == null || queryText.isEmpty()) {
+            throw new IllegalArgumentException("Query text cannot be null or 
empty");
+        }
+        if (limit <= 0) {
+            throw new IllegalArgumentException("Limit must be positive, got: " 
+ limit);
+        }
+        if (fieldName == null || fieldName.isEmpty()) {
+            throw new IllegalArgumentException("Field name cannot be null or 
empty");
+        }
+        this.queryText = queryText;
+        this.limit = limit;
+        this.fieldName = fieldName;
+    }
+
+    public String queryText() {
+        return queryText;
+    }
+
+    public int limit() {
+        return limit;
+    }
+
+    public String fieldName() {
+        return fieldName;
+    }
+
+    public Optional<ScoredGlobalIndexResult> visit(GlobalIndexReader visitor) {
+        return visitor.visitFullTextSearch(this);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "FullTextSearch{field=%s, query='%s', limit=%d}", fieldName, 
queryText, limit);
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java
new file mode 100644
index 0000000000..affc79e916
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testfulltext;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FullTextSearch;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * Test full-text index reader that performs brute-force text matching. Loads 
all documents into
+ * memory and scores them against the query using simple term-frequency 
matching.
+ *
+ * <p>Scoring: for each query term found in the document (case-insensitive), 
score += 1.0 /
+ * queryTermCount. A document that contains all query terms scores 1.0.
+ */
+public class TestFullTextGlobalIndexReader implements GlobalIndexReader {
+
+    private final GlobalIndexFileReader fileReader;
+    private final GlobalIndexIOMeta ioMeta;
+
+    private String[] documents;
+    private int count;
+
+    public TestFullTextGlobalIndexReader(
+            GlobalIndexFileReader fileReader, GlobalIndexIOMeta ioMeta) {
+        this.fileReader = fileReader;
+        this.ioMeta = ioMeta;
+    }
+
+    @Override
+    public Optional<ScoredGlobalIndexResult> 
visitFullTextSearch(FullTextSearch fullTextSearch) {
+        try {
+            ensureLoaded();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to load test full-text index", 
e);
+        }
+
+        String queryText = fullTextSearch.queryText();
+        int limit = fullTextSearch.limit();
+        int effectiveK = Math.min(limit, count);
+        if (effectiveK <= 0) {
+            return Optional.empty();
+        }
+
+        String[] queryTerms = queryText.toLowerCase(Locale.ROOT).split("\\s+");
+
+        // Min-heap: smallest score at head, so we evict the weakest candidate.
+        PriorityQueue<ScoredRow> topK =
+                new PriorityQueue<>(effectiveK + 1, 
Comparator.comparingDouble(s -> s.score));
+
+        for (int i = 0; i < count; i++) {
+            float score = computeScore(documents[i], queryTerms);
+            if (score <= 0) {
+                continue;
+            }
+            if (topK.size() < effectiveK) {
+                topK.offer(new ScoredRow(i, score));
+            } else if (score > topK.peek().score) {
+                topK.poll();
+                topK.offer(new ScoredRow(i, score));
+            }
+        }
+
+        if (topK.isEmpty()) {
+            return Optional.empty();
+        }
+
+        RoaringNavigableMap64 resultBitmap = new RoaringNavigableMap64();
+        Map<Long, Float> scoreMap = new HashMap<>(topK.size());
+        for (ScoredRow row : topK) {
+            resultBitmap.add(row.rowId);
+            scoreMap.put(row.rowId, row.score);
+        }
+
+        return Optional.of(ScoredGlobalIndexResult.create(() -> resultBitmap, 
scoreMap::get));
+    }
+
+    private static float computeScore(String document, String[] queryTerms) {
+        String lowerDoc = document.toLowerCase(Locale.ROOT);
+        float score = 0;
+        for (String term : queryTerms) {
+            if (lowerDoc.contains(term)) {
+                score += 1.0f / queryTerms.length;
+            }
+        }
+        return score;
+    }
+
+    private void ensureLoaded() throws IOException {
+        if (documents != null) {
+            return;
+        }
+
+        try (SeekableInputStream in = fileReader.getInputStream(ioMeta)) {
+            // Read header: count (4 bytes)
+            byte[] headerBytes = new byte[4];
+            readFully(in, headerBytes);
+            ByteBuffer header = ByteBuffer.wrap(headerBytes);
+            header.order(ByteOrder.LITTLE_ENDIAN);
+            count = header.getInt();
+
+            // Read documents
+            documents = new String[count];
+            for (int i = 0; i < count; i++) {
+                byte[] lenBytes = new byte[4];
+                readFully(in, lenBytes);
+                ByteBuffer lenBuf = ByteBuffer.wrap(lenBytes);
+                lenBuf.order(ByteOrder.LITTLE_ENDIAN);
+                int textLen = lenBuf.getInt();
+
+                byte[] textBytes = new byte[textLen];
+                readFully(in, textBytes);
+                documents[i] = new String(textBytes, StandardCharsets.UTF_8);
+            }
+        }
+    }
+
+    private static void readFully(SeekableInputStream in, byte[] buf) throws 
IOException {
+        int offset = 0;
+        while (offset < buf.length) {
+            int bytesRead = in.read(buf, offset, buf.length - offset);
+            if (bytesRead < 0) {
+                throw new IOException(
+                        "Unexpected end of stream: read "
+                                + offset
+                                + " bytes but expected "
+                                + buf.length);
+            }
+            offset += bytesRead;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        documents = null;
+    }
+
+    // =================== unsupported predicate operations 
=====================
+
+    @Override
+    public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, 
Object literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, 
Object literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, 
Object literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object 
literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, 
Object literal) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> 
literals) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, 
List<Object> literals) {
+        return Optional.empty();
+    }
+
+    /** A row ID paired with its similarity score, used in the top-k min-heap. 
*/
+    private static class ScoredRow {
+        final long rowId;
+        final float score;
+
+        ScoredRow(long rowId, float score) {
+            this.rowId = rowId;
+            this.score = score;
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java
new file mode 100644
index 0000000000..cf8d2de24a
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testfulltext;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test full-text index writer that stores all text documents in memory and 
writes them to a single
+ * binary file on {@link #finish()}.
+ *
+ * <p>Binary format (all values little-endian):
+ *
+ * <pre>
+ *   [4 bytes] count (int)
+ *   For each document:
+ *     [4 bytes] text length in bytes (int)
+ *     [N bytes] UTF-8 text
+ * </pre>
+ */
+public class TestFullTextGlobalIndexWriter implements 
GlobalIndexSingletonWriter {
+
+    private static final String FILE_NAME_PREFIX = "test-fulltext";
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final List<String> documents;
+
+    public TestFullTextGlobalIndexWriter(GlobalIndexFileWriter fileWriter) {
+        this.fileWriter = fileWriter;
+        this.documents = new ArrayList<>();
+    }
+
+    @Override
+    public void write(Object fieldData) {
+        if (fieldData == null) {
+            throw new IllegalArgumentException("Text field data must not be 
null");
+        }
+
+        String text;
+        if (fieldData instanceof String) {
+            text = (String) fieldData;
+        } else if (fieldData instanceof BinaryString) {
+            text = fieldData.toString();
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported text type: " + 
fieldData.getClass().getName());
+        }
+        documents.add(text);
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        if (documents.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        try {
+            String fileName = fileWriter.newFileName(FILE_NAME_PREFIX);
+            try (PositionOutputStream out = 
fileWriter.newOutputStream(fileName)) {
+                // Header: count
+                ByteBuffer header = ByteBuffer.allocate(4);
+                header.order(ByteOrder.LITTLE_ENDIAN);
+                header.putInt(documents.size());
+                out.write(header.array());
+
+                // Documents
+                for (String doc : documents) {
+                    byte[] textBytes = doc.getBytes(StandardCharsets.UTF_8);
+                    ByteBuffer lenBuf = ByteBuffer.allocate(4);
+                    lenBuf.order(ByteOrder.LITTLE_ENDIAN);
+                    lenBuf.putInt(textBytes.length);
+                    out.write(lenBuf.array());
+                    out.write(textBytes);
+                }
+                out.flush();
+            }
+
+            return Collections.singletonList(new ResultEntry(fileName, 
documents.size(), null));
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write test full-text index", 
e);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java
new file mode 100644
index 0000000000..d9d3d14ff7
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.testfulltext;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VarCharType;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A test-only {@link GlobalIndexer} for full-text search. Uses brute-force 
in-memory inverted index
+ * for text queries. No native library dependency required.
+ */
+public class TestFullTextGlobalIndexer implements GlobalIndexer {
+
+    private final DataType fieldType;
+
+    public TestFullTextGlobalIndexer(DataType fieldType, Options options) {
+        checkArgument(
+                fieldType instanceof VarCharType,
+                "TestFullTextGlobalIndexer only supports VARCHAR/STRING, but 
got: " + fieldType);
+        this.fieldType = fieldType;
+    }
+
+    @Override
+    public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) 
throws IOException {
+        return new TestFullTextGlobalIndexWriter(fileWriter);
+    }
+
+    @Override
+    public GlobalIndexReader createReader(
+            GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) 
throws IOException {
+        checkArgument(files.size() == 1, "Expected exactly one index file per 
shard");
+        return new TestFullTextGlobalIndexReader(fileReader, files.get(0));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java
similarity index 50%
copy from 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
copy to 
paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java
index 2ad14bd055..b501c2df10 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java
@@ -16,21 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.globalindex.testfulltext;
 
-import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
 
-import org.apache.spark.sql.types.StructType
+/**
+ * A test-only {@link GlobalIndexerFactory} for full-text search. Uses 
brute-force in-memory
+ * inverted index, no native dependencies required.
+ */
+public class TestFullTextGlobalIndexerFactory implements GlobalIndexerFactory {
+
+    public static final String IDENTIFIER = "test-fulltext";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 
-case class PaimonScan(
-    table: InnerTable,
-    requiredSchema: StructType,
-    pushedPartitionFilters: Seq[PartitionPredicate],
-    pushedDataFilters: Seq[Predicate],
-    override val pushedLimit: Option[Int] = None,
-    override val pushedTopN: Option[TopN] = None,
-    override val pushedVectorSearch: Option[VectorSearch] = None,
-    bucketedScanDisabled: Boolean = true)
-  extends PaimonBaseScan(table) {}
+    @Override
+    public GlobalIndexer create(DataField field, Options options) {
+        return new TestFullTextGlobalIndexer(field.type(), options);
+    }
+}
diff --git 
a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
 
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
index 3fa7826081..841301f1b2 100644
--- 
a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
+++ 
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory
+org.apache.paimon.globalindex.testfulltext.TestFullTextGlobalIndexerFactory
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 8f298ff9bf..d7af8510a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -31,6 +31,7 @@ import org.apache.paimon.table.format.FormatBatchWriteBuilder;
 import org.apache.paimon.table.format.FormatReadBuilder;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.FullTextSearchBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.VectorSearchBuilder;
 import org.apache.paimon.types.RowType;
@@ -278,6 +279,12 @@ public interface FormatTable extends Table {
             throw new UnsupportedOperationException("FormatTable does not 
support vector search.");
         }
 
+        @Override
+        public FullTextSearchBuilder newFullTextSearchBuilder() {
+            throw new UnsupportedOperationException(
+                    "FormatTable does not support full-text search.");
+        }
+
         @Override
         public CatalogContext catalogContext() {
             return this.catalogContext;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java
new file mode 100644
index 0000000000..efe4a9f3d9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.predicate.FullTextSearch;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A table wrapper to hold full-text search information. This is used to pass 
full-text search
+ * pushdown information from logical plan optimization to physical plan 
execution. For now, it is
+ * only used by internal for Spark engine.
+ */
+public class FullTextSearchTable implements ReadonlyTable {
+
+    private final InnerTable origin;
+    private final FullTextSearch fullTextSearch;
+
+    private FullTextSearchTable(InnerTable origin, FullTextSearch 
fullTextSearch) {
+        this.origin = origin;
+        this.fullTextSearch = fullTextSearch;
+    }
+
+    public static FullTextSearchTable create(InnerTable origin, FullTextSearch 
fullTextSearch) {
+        return new FullTextSearchTable(origin, fullTextSearch);
+    }
+
+    public FullTextSearch fullTextSearch() {
+        return fullTextSearch;
+    }
+
+    public InnerTable origin() {
+        return origin;
+    }
+
+    @Override
+    public String name() {
+        return origin.name();
+    }
+
+    @Override
+    public RowType rowType() {
+        return origin.rowType();
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return origin.primaryKeys();
+    }
+
+    @Override
+    public List<String> partitionKeys() {
+        return origin.partitionKeys();
+    }
+
+    @Override
+    public Map<String, String> options() {
+        return origin.options();
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return origin.fileIO();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return origin.newRead();
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new FullTextSearchTable((InnerTable) 
origin.copy(dynamicOptions), fullTextSearch);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
index c404672e43..d360597744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java
@@ -25,6 +25,8 @@ import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilderImpl;
 import org.apache.paimon.table.sink.WriteSelector;
+import org.apache.paimon.table.source.FullTextSearchBuilder;
+import org.apache.paimon.table.source.FullTextSearchBuilderImpl;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -60,6 +62,11 @@ public interface InnerTable extends Table {
         return new VectorSearchBuilderImpl(this);
     }
 
+    @Override
+    default FullTextSearchBuilder newFullTextSearchBuilder() {
+        return new FullTextSearchBuilderImpl(this);
+    }
+
     @Override
     default BatchWriteBuilder newBatchWriteBuilder() {
         return new BatchWriteBuilderImpl(this);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 938718db98..13d56d6849 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.FullTextSearchBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.VectorSearchBuilder;
 import org.apache.paimon.types.RowType;
@@ -217,6 +218,9 @@ public interface Table extends Serializable {
     /** Returns a new vector search builder. */
     VectorSearchBuilder newVectorSearchBuilder();
 
+    /** Returns a new full-text search builder. */
+    FullTextSearchBuilder newFullTextSearchBuilder();
+
     /** Returns a new read builder. */
     ReadBuilder newReadBuilder();
 
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java
similarity index 55%
copy from 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java
index 2ad14bd055..77cc04b16d 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.table.source;
 
-import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.globalindex.GlobalIndexResult;
 
-import org.apache.spark.sql.types.StructType
+import java.util.List;
 
-case class PaimonScan(
-    table: InnerTable,
-    requiredSchema: StructType,
-    pushedPartitionFilters: Seq[PartitionPredicate],
-    pushedDataFilters: Seq[Predicate],
-    override val pushedLimit: Option[Int] = None,
-    override val pushedTopN: Option[TopN] = None,
-    override val pushedVectorSearch: Option[VectorSearch] = None,
-    bucketedScanDisabled: Boolean = true)
-  extends PaimonBaseScan(table) {}
+/** Full-text read to read index files. */
+public interface FullTextRead {
+
+    default GlobalIndexResult read(FullTextScan.Plan plan) {
+        return read(plan.splits());
+    }
+
+    GlobalIndexResult read(List<FullTextSearchSplit> splits);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java
new file mode 100644
index 0000000000..f58f8f26ea
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
+import org.apache.paimon.globalindex.OffsetGlobalIndexReader;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.predicate.FullTextSearch;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link FullTextRead}. */
+public class FullTextReadImpl implements FullTextRead {
+
+    private final FileStoreTable table;
+    private final int limit;
+    private final DataField textColumn;
+    private final String queryText;
+
+    public FullTextReadImpl(
+            FileStoreTable table, int limit, DataField textColumn, String 
queryText) {
+        this.table = table;
+        this.limit = limit;
+        this.textColumn = textColumn;
+        this.queryText = queryText;
+    }
+
+    @Override
+    public GlobalIndexResult read(List<FullTextSearchSplit> splits) {
+        if (splits.isEmpty()) {
+            return GlobalIndexResult.createEmpty();
+        }
+
+        Integer threadNum = table.coreOptions().globalIndexThreadNum();
+
+        String indexType = 
splits.get(0).fullTextIndexFiles().get(0).indexType();
+        GlobalIndexer globalIndexer =
+                GlobalIndexerFactoryUtils.load(indexType)
+                        .create(textColumn, 
table.coreOptions().toConfiguration());
+        IndexPathFactory indexPathFactory = 
table.store().pathFactory().globalIndexFileFactory();
+        Iterator<Optional<ScoredGlobalIndexResult>> resultIterators =
+                randomlyExecuteSequentialReturn(
+                        split ->
+                                singletonList(
+                                        eval(
+                                                globalIndexer,
+                                                indexPathFactory,
+                                                split.rowRangeStart(),
+                                                split.rowRangeEnd(),
+                                                split.fullTextIndexFiles())),
+                        splits,
+                        threadNum);
+
+        ScoredGlobalIndexResult result = ScoredGlobalIndexResult.createEmpty();
+        while (resultIterators.hasNext()) {
+            Optional<ScoredGlobalIndexResult> next = resultIterators.next();
+            if (next.isPresent()) {
+                result = result.or(next.get());
+            }
+        }
+
+        return result.topK(limit);
+    }
+
+    private Optional<ScoredGlobalIndexResult> eval(
+            GlobalIndexer globalIndexer,
+            IndexPathFactory indexPathFactory,
+            long rowRangeStart,
+            long rowRangeEnd,
+            List<IndexFileMeta> fullTextIndexFiles) {
+        List<GlobalIndexIOMeta> indexIOMetaList = new ArrayList<>();
+        for (IndexFileMeta indexFile : fullTextIndexFiles) {
+            GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+            indexIOMetaList.add(
+                    new GlobalIndexIOMeta(
+                            indexPathFactory.toPath(indexFile),
+                            indexFile.fileSize(),
+                            meta.indexMeta()));
+        }
+        @SuppressWarnings("resource")
+        FileIO fileIO = table.fileIO();
+        GlobalIndexFileReader indexFileReader = m -> 
fileIO.newInputStream(m.filePath());
+        try (GlobalIndexReader reader =
+                globalIndexer.createReader(indexFileReader, indexIOMetaList)) {
+            FullTextSearch fullTextSearch = new FullTextSearch(queryText, 
limit, textColumn.name());
+            return new OffsetGlobalIndexReader(reader, rowRangeStart, 
rowRangeEnd)
+                    .visitFullTextSearch(fullTextSearch);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java
similarity index 55%
copy from 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java
index 2ad14bd055..563ffe1c9d 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java
@@ -16,21 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.table.source;
 
-import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
-import org.apache.paimon.table.InnerTable
+import java.util.List;
 
-import org.apache.spark.sql.types.StructType
+/** Full-text scan to scan index files. */
+public interface FullTextScan {
 
-case class PaimonScan(
-    table: InnerTable,
-    requiredSchema: StructType,
-    pushedPartitionFilters: Seq[PartitionPredicate],
-    pushedDataFilters: Seq[Predicate],
-    override val pushedLimit: Option[Int] = None,
-    override val pushedTopN: Option[TopN] = None,
-    override val pushedVectorSearch: Option[VectorSearch] = None,
-    bucketedScanDisabled: Boolean = true)
-  extends PaimonBaseScan(table) {}
+    Plan scan();
+
+    /** Plan of full-text scan. */
+    interface Plan {
+        List<FullTextSearchSplit> splits();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java
new file mode 100644
index 0000000000..cc77d9121a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link FullTextScan}. */
+public class FullTextScanImpl implements FullTextScan {
+
+    private final FileStoreTable table;
+    private final DataField textColumn;
+
+    public FullTextScanImpl(FileStoreTable table, DataField textColumn) {
+        this.table = table;
+        this.textColumn = textColumn;
+    }
+
+    @Override
+    public Plan scan() {
+        Objects.requireNonNull(textColumn, "Text column must be set");
+
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+        IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
+        Filter<IndexManifestEntry> indexFileFilter =
+                entry -> {
+                    GlobalIndexMeta globalIndex = 
entry.indexFile().globalIndexMeta();
+                    if (globalIndex == null) {
+                        return false;
+                    }
+                    return textColumn.id() == globalIndex.indexFieldId();
+                };
+
+        List<IndexFileMeta> allIndexFiles =
+                indexFileHandler.scan(snapshot, indexFileFilter).stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .collect(Collectors.toList());
+
+        // Group full-text index files by (rowRangeStart, rowRangeEnd)
+        Map<Range, List<IndexFileMeta>> byRange = new HashMap<>();
+        for (IndexFileMeta indexFile : allIndexFiles) {
+            GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta());
+            Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd());
+            byRange.computeIfAbsent(range, k -> new 
ArrayList<>()).add(indexFile);
+        }
+
+        List<FullTextSearchSplit> splits = new ArrayList<>();
+        for (Map.Entry<Range, List<IndexFileMeta>> entry : byRange.entrySet()) 
{
+            Range range = entry.getKey();
+            splits.add(new FullTextSearchSplit(range.from, range.to, 
entry.getValue()));
+        }
+
+        return () -> splits;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java
new file mode 100644
index 0000000000..731962d0cf
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.globalindex.GlobalIndexResult;
+
+import java.io.Serializable;
+
+/** Builder to build full-text search. */
+public interface FullTextSearchBuilder extends Serializable {
+
+    /** The top k results to return. */
+    FullTextSearchBuilder withLimit(int limit);
+
+    /** The text column to search. */
+    FullTextSearchBuilder withTextColumn(String name);
+
+    /** The query text to search. */
+    FullTextSearchBuilder withQueryText(String queryText);
+
+    /** Create full-text scan to scan index files. */
+    FullTextScan newFullTextScan();
+
+    /** Create full-text read to read index files. */
+    FullTextRead newFullTextRead();
+
+    /** Execute full-text index search in local. */
+    default GlobalIndexResult executeLocal() {
+        return newFullTextRead().read(newFullTextScan().scan());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java
new file mode 100644
index 0000000000..1165fcf205
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.InnerTable;
+import org.apache.paimon.types.DataField;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Implementation for {@link FullTextSearchBuilder}. */
+public class FullTextSearchBuilderImpl implements FullTextSearchBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+
+    private int limit;
+    private DataField textColumn;
+    private String queryText;
+
+    public FullTextSearchBuilderImpl(InnerTable table) {
+        this.table = (FileStoreTable) table;
+    }
+
+    @Override
+    public FullTextSearchBuilder withLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
+    @Override
+    public FullTextSearchBuilder withTextColumn(String name) {
+        this.textColumn = table.rowType().getField(name);
+        return this;
+    }
+
+    @Override
+    public FullTextSearchBuilder withQueryText(String queryText) {
+        this.queryText = queryText;
+        return this;
+    }
+
+    @Override
+    public FullTextScan newFullTextScan() {
+        checkNotNull(textColumn, "Text column must be set via 
withTextColumn()");
+        return new FullTextScanImpl(table, textColumn);
+    }
+
+    @Override
+    public FullTextRead newFullTextRead() {
+        checkArgument(limit > 0, "Limit must be positive, set via 
withLimit()");
+        checkNotNull(textColumn, "Text column must be set via 
withTextColumn()");
+        checkNotNull(queryText, "Query text must be set via withQueryText()");
+        return new FullTextReadImpl(table, limit, textColumn, queryText);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java
new file mode 100644
index 0000000000..d230482186
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Split of full-text search. */
+public class FullTextSearchSplit implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final long rowRangeStart;
+    private final long rowRangeEnd;
+    private final List<IndexFileMeta> fullTextIndexFiles;
+
+    public FullTextSearchSplit(
+            long rowRangeStart, long rowRangeEnd, List<IndexFileMeta> 
fullTextIndexFiles) {
+        this.rowRangeStart = rowRangeStart;
+        this.rowRangeEnd = rowRangeEnd;
+        this.fullTextIndexFiles = fullTextIndexFiles;
+    }
+
+    public long rowRangeStart() {
+        return rowRangeStart;
+    }
+
+    public long rowRangeEnd() {
+        return rowRangeEnd;
+    }
+
+    public List<IndexFileMeta> fullTextIndexFiles() {
+        return fullTextIndexFiles;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FullTextSearchSplit that = (FullTextSearchSplit) o;
+        return rowRangeStart == that.rowRangeStart
+                && rowRangeEnd == that.rowRangeEnd
+                && Objects.equals(fullTextIndexFiles, that.fullTextIndexFiles);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowRangeStart, rowRangeEnd, fullTextIndexFiles);
+    }
+
+    @Override
+    public String toString() {
+        return "FullTextSearchSplit{"
+                + "rowRangeStart="
+                + rowRangeStart
+                + ", rowRangeEnd="
+                + rowRangeEnd
+                + ", fullTextIndexFiles="
+                + fullTextIndexFiles
+                + '}';
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java
new file mode 100644
index 0000000000..c52a2a46af
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import 
org.apache.paimon.globalindex.testfulltext.TestFullTextGlobalIndexerFactory;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FullTextSearchBuilder} using test-only brute-force 
full-text index. */
+public class FullTextSearchBuilderTest extends TableTestBase {
+
+    private static final String TEXT_FIELD_NAME = "content";
+
+    @Override
+    protected Schema schemaDefault() {
+        return Schema.newBuilder()
+                .column("id", DataTypes.INT())
+                .column(TEXT_FIELD_NAME, DataTypes.STRING())
+                .option(CoreOptions.BUCKET.key(), "-1")
+                .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
+                .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
+                .build();
+    }
+
+    @Test
+    public void testFullTextSearchEndToEnd() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] documents = {
+            "Apache Paimon is a lake format",
+            "Paimon supports full-text search",
+            "Vector search is also supported",
+            "Paimon provides streaming and batch processing",
+            "Full-text indexing enables fast text queries",
+            "The lake format supports ACID transactions"
+        };
+
+        writeDocuments(table, documents);
+        buildAndCommitIndex(table, documents);
+
+        // Query "Paimon" - should match rows 0, 1, 3
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("Paimon")
+                        .withLimit(3)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+        assertThat(result.results().isEmpty()).isFalse();
+
+        // Read using the search result
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Integer> ids = new ArrayList<>();
+        TableScan.Plan plan = 
readBuilder.newScan().withGlobalIndexResult(result).plan();
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+        }
+
+        assertThat(ids).isNotEmpty();
+        assertThat(ids.size()).isLessThanOrEqualTo(3);
+        // Rows 0, 1, 3 contain "Paimon"
+        assertThat(ids).containsAnyOf(0, 1, 3);
+    }
+
+    @Test
+    public void testFullTextSearchMultiTermQuery() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] documents = {
+            "Apache Paimon lake format",
+            "Paimon full-text search support",
+            "full-text search in Apache Paimon",
+            "Vector search capabilities",
+        };
+
+        writeDocuments(table, documents);
+        buildAndCommitIndex(table, documents);
+
+        // Query "Paimon search" - row 2 matches both terms, rows 1 matches 
both, row 0 matches one
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("Paimon search")
+                        .withLimit(2)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+        assertThat(result.results().isEmpty()).isFalse();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = 
readBuilder.newScan().withGlobalIndexResult(result).plan();
+        List<Integer> ids = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+        }
+
+        assertThat(ids).hasSize(2);
+        // Rows 1 and 2 contain both "Paimon" and "search"
+        assertThat(ids).contains(1, 2);
+    }
+
+    @Test
+    public void testFullTextSearchEmptyResult() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        // Write data but no index - should return empty result
+        String[] documents = {"hello world", "foo bar"};
+        writeDocuments(table, documents);
+
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("nonexistent")
+                        .withLimit(1)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result.results().isEmpty()).isTrue();
+    }
+
+    @Test
+    public void testFullTextSearchTopKLimit() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] documents = new String[20];
+        for (int i = 0; i < 20; i++) {
+            documents[i] = "document number " + i + " with common keyword";
+        }
+
+        writeDocuments(table, documents);
+        buildAndCommitIndex(table, documents);
+
+        // Search with limit=5
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("keyword")
+                        .withLimit(5)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = 
readBuilder.newScan().withGlobalIndexResult(result).plan();
+        List<Integer> ids = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+        }
+
+        assertThat(ids.size()).isLessThanOrEqualTo(5);
+    }
+
+    @Test
+    public void testFullTextSearchWithMultipleIndexFiles() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] allDocuments = {
+            "Apache Paimon lake format", // row 0
+            "Paimon supports streaming", // row 1
+            "batch processing engine", // row 2
+            "Paimon full-text search", // row 3
+            "vector similarity search", // row 4
+            "Paimon ACID transactions" // row 5
+        };
+
+        writeDocuments(table, allDocuments);
+
+        // Build two separate index files covering different row ranges
+        buildAndCommitMultipleIndexFiles(table, allDocuments);
+
+        // Query "Paimon" - results should span across both index files
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("Paimon")
+                        .withLimit(4)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+        assertThat(result.results().isEmpty()).isFalse();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = 
readBuilder.newScan().withGlobalIndexResult(result).plan();
+        List<Integer> ids = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+        }
+
+        assertThat(ids).isNotEmpty();
+        // Rows 0,1 are in first index file, rows 3,5 are in second
+        assertThat(ids).containsAnyOf(0, 1);
+        assertThat(ids).containsAnyOf(3, 5);
+    }
+
+    @Test
+    public void testFullTextSearchNoMatchingDocuments() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] documents = {
+            "Apache Paimon lake format", "streaming batch processing", "ACID 
transactions support"
+        };
+
+        writeDocuments(table, documents);
+        buildAndCommitIndex(table, documents);
+
+        // Query a term that doesn't exist in any document
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("nonexistent")
+                        .withLimit(3)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result.results().isEmpty()).isTrue();
+    }
+
+    @Test
+    public void testFullTextSearchCaseInsensitive() throws Exception {
+        createTableDefault();
+        FileStoreTable table = getTableDefault();
+
+        String[] documents = {
+            "Apache PAIMON Lake Format", "paimon supports search", "Paimon Is 
Great"
+        };
+
+        writeDocuments(table, documents);
+        buildAndCommitIndex(table, documents);
+
+        // Query lowercase "paimon" should match all three (case-insensitive)
+        GlobalIndexResult result =
+                table.newFullTextSearchBuilder()
+                        .withQueryText("paimon")
+                        .withLimit(3)
+                        .withTextColumn(TEXT_FIELD_NAME)
+                        .executeLocal();
+
+        assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = 
readBuilder.newScan().withGlobalIndexResult(result).plan();
+        List<Integer> ids = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(row -> ids.add(row.getInt(0)));
+        }
+
+        assertThat(ids).hasSize(3);
+        assertThat(ids).contains(0, 1, 2);
+    }
+
+    // ====================== Helper methods ======================
+
+    private void writeDocuments(FileStoreTable table, String[] documents) 
throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (int i = 0; i < documents.length; i++) {
+                write.write(GenericRow.of(i, 
BinaryString.fromString(documents[i])));
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private void buildAndCommitIndex(FileStoreTable table, String[] documents) 
throws Exception {
+        Options options = table.coreOptions().toConfiguration();
+        DataField textField = table.rowType().getField(TEXT_FIELD_NAME);
+
+        GlobalIndexSingletonWriter writer =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                                textField,
+                                options);
+        for (String doc : documents) {
+            writer.write(doc);
+        }
+        List<ResultEntry> entries = writer.finish();
+
+        Range rowRange = new Range(0, documents.length - 1);
+        List<IndexFileMeta> indexFiles =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange,
+                        textField.id(),
+                        TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                        entries);
+
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+    }
+
+    private void buildAndCommitMultipleIndexFiles(FileStoreTable table, 
String[] documents)
+            throws Exception {
+        Options options = table.coreOptions().toConfiguration();
+        DataField textField = table.rowType().getField(TEXT_FIELD_NAME);
+        int mid = documents.length / 2;
+
+        // Build first index file covering rows [0, mid)
+        GlobalIndexSingletonWriter writer1 =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                                textField,
+                                options);
+        for (int i = 0; i < mid; i++) {
+            writer1.write(documents[i]);
+        }
+        List<ResultEntry> entries1 = writer1.finish();
+        Range rowRange1 = new Range(0, mid - 1);
+        List<IndexFileMeta> indexFiles1 =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange1,
+                        textField.id(),
+                        TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                        entries1);
+
+        // Build second index file covering rows [mid, end)
+        GlobalIndexSingletonWriter writer2 =
+                (GlobalIndexSingletonWriter)
+                        GlobalIndexBuilderUtils.createIndexWriter(
+                                table,
+                                TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                                textField,
+                                options);
+        for (int i = mid; i < documents.length; i++) {
+            writer2.write(documents[i]);
+        }
+        List<ResultEntry> entries2 = writer2.finish();
+        Range rowRange2 = new Range(mid, documents.length - 1);
+        List<IndexFileMeta> indexFiles2 =
+                GlobalIndexBuilderUtils.toIndexFileMetas(
+                        table.fileIO(),
+                        table.store().pathFactory().globalIndexFileFactory(),
+                        table.coreOptions(),
+                        rowRange2,
+                        textField.id(),
+                        TestFullTextGlobalIndexerFactory.IDENTIFIER,
+                        entries2);
+
+        // Combine all index files and commit together
+        List<IndexFileMeta> allIndexFiles = new ArrayList<>();
+        allIndexFiles.addAll(indexFiles1);
+        allIndexFiles.addAll(indexFiles2);
+
+        DataIncrement dataIncrement = 
DataIncrement.indexIncrement(allIndexFiles);
+        CommitMessage message =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement());
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(Collections.singletonList(message));
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 4f5451c95d..f6e3b97c81 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
TopN, VectorSearch}
+import org.apache.paimon.predicate.FullTextSearch
 import org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking
 import org.apache.paimon.table.Table
 import org.apache.paimon.types.RowType
@@ -53,6 +54,7 @@ abstract class PaimonBaseScanBuilder
   protected var pushedLimit: Option[Int] = None
   protected var pushedTopN: Option[TopN] = None
   protected var pushedVectorSearch: Option[VectorSearch] = None
+  protected var pushedFullTextSearch: Option[FullTextSearch] = None
 
   protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
 
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 2ad14bd055..f5a5c8e95e 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
+import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
 import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.sql.types.StructType
@@ -32,5 +32,6 @@ case class PaimonScan(
     override val pushedLimit: Option[Int] = None,
     override val pushedTopN: Option[TopN] = None,
     override val pushedVectorSearch: Option[VectorSearch] = None,
+    override val pushedFullTextSearch: Option[FullTextSearch] = None,
     bucketedScanDisabled: Boolean = true)
   extends PaimonBaseScan(table) {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 1da9b17a81..707ee13459 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
+import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
@@ -36,6 +36,7 @@ case class PaimonScan(
     override val pushedLimit: Option[Int],
     override val pushedTopN: Option[TopN],
     override val pushedVectorSearch: Option[VectorSearch],
+    override val pushedFullTextSearch: Option[FullTextSearch] = None,
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table)
   with SupportsReportPartitioning {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index bd884d64b1..30a1621925 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -45,7 +45,7 @@ abstract class PaimonBaseScan(table: InnerTable)
   protected def getInputSplits: Array[Split] = {
     readBuilder
       .newScan()
-      .withGlobalIndexResult(evalVectorSearch())
+      .withGlobalIndexResult(evalGlobalIndexSearch())
       .asInstanceOf[InnerTableScan]
       .withMetricRegistry(paimonMetricsRegistry)
       .plan()
@@ -54,11 +54,21 @@ abstract class PaimonBaseScan(table: InnerTable)
       .toArray
   }
 
-  private def evalVectorSearch(): GlobalIndexResult = {
-    if (pushedVectorSearch.isEmpty) {
-      return null
+  private def evalGlobalIndexSearch(): GlobalIndexResult = {
+    if (pushedVectorSearch.isDefined && pushedFullTextSearch.isDefined) {
+      throw new UnsupportedOperationException(
+        "Cannot push down both vector search and full-text search 
simultaneously.")
+    }
+    if (pushedVectorSearch.isDefined) {
+      return evalVectorSearch()
     }
+    if (pushedFullTextSearch.isDefined) {
+      return evalFullTextSearch()
+    }
+    null
+  }
 
+  private def evalVectorSearch(): GlobalIndexResult = {
     val vectorSearch = pushedVectorSearch.get
     val vectorBuilder = table
       .newVectorSearchBuilder()
@@ -74,6 +84,16 @@ abstract class PaimonBaseScan(table: InnerTable)
     vectorBuilder.newVectorRead().read(vectorBuilder.newVectorScan().scan())
   }
 
+  private def evalFullTextSearch(): GlobalIndexResult = {
+    val fullTextSearch = pushedFullTextSearch.get
+    val ftBuilder = table
+      .newFullTextSearchBuilder()
+      .withQueryText(fullTextSearch.queryText())
+      .withTextColumn(fullTextSearch.fieldName())
+      .withLimit(fullTextSearch.limit())
+    ftBuilder.newFullTextRead().read(ftBuilder.newFullTextScan().scan())
+  }
+
   override def toBatch: Batch = {
     ensureNoFullScan()
     super.toBatch
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 47723171e4..14a090ff14 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
TopN, VectorSearch}
+import org.apache.paimon.predicate.FullTextSearch
 import org.apache.paimon.table.{SpecialFields, Table}
 import org.apache.paimon.types.RowType
 
@@ -53,6 +54,7 @@ abstract class PaimonBaseScanBuilder
   protected var pushedLimit: Option[Int] = None
   protected var pushedTopN: Option[TopN] = None
   protected var pushedVectorSearch: Option[VectorSearch] = None
+  protected var pushedFullTextSearch: Option[FullTextSearch] = None
 
   protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 12471f5787..0fa4d73054 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
+import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
 import org.apache.paimon.spark.commands.BucketExpression.quote
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -42,6 +42,7 @@ case class PaimonScan(
     override val pushedLimit: Option[Int],
     override val pushedTopN: Option[TopN],
     override val pushedVectorSearch: Option[VectorSearch],
+    override val pushedFullTextSearch: Option[FullTextSearch] = None,
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table)
   with SupportsReportPartitioning
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 2bf7a8ddd4..7e1989283b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -128,7 +128,7 @@ class PaimonScanBuilder(val table: InnerTable)
     localScan match {
       case Some(scan) => scan
       case None =>
-        val (actualTable, vectorSearch) = table match {
+        val (actualTable, vectorSearch, fullTextSearch) = table match {
           case vst: org.apache.paimon.table.VectorSearchTable =>
             val tableVectorSearch = Option(vst.vectorSearch())
             val vs = (tableVectorSearch, pushedVectorSearch) match {
@@ -136,8 +136,10 @@ class PaimonScanBuilder(val table: InnerTable)
               case (None, Some(_)) => pushedVectorSearch
               case (None, None) => None
             }
-            (vst.origin(), vs)
-          case _ => (table, pushedVectorSearch)
+            (vst.origin(), vs, None)
+          case ftst: org.apache.paimon.table.FullTextSearchTable =>
+            (ftst.origin(), None, Option(ftst.fullTextSearch()))
+          case _ => (table, pushedVectorSearch, pushedFullTextSearch)
         }
 
         PaimonScan(
@@ -147,7 +149,8 @@ class PaimonScanBuilder(val table: InnerTable)
           pushedDataFilters,
           pushedLimit,
           pushedTopN,
-          vectorSearch)
+          vectorSearch,
+          fullTextSearch)
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 6bb6004db8..d570f60303 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -19,10 +19,10 @@
 package org.apache.paimon.spark.catalyst.plans.logical
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.predicate.VectorSearch
+import org.apache.paimon.predicate.{FullTextSearch, VectorSearch}
 import org.apache.paimon.spark.SparkTable
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
-import org.apache.paimon.table.{DataTable, InnerTable, VectorSearchTable}
+import org.apache.paimon.table.{DataTable, FullTextSearchTable, InnerTable, 
VectorSearchTable}
 import 
org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException
 
 import org.apache.spark.sql.PaimonUtils.createDataset
@@ -44,9 +44,15 @@ object PaimonTableValuedFunctions {
   val INCREMENTAL_BETWEEN_TIMESTAMP = "paimon_incremental_between_timestamp"
   val INCREMENTAL_TO_AUTO_TAG = "paimon_incremental_to_auto_tag"
   val VECTOR_SEARCH = "vector_search"
+  val FULL_TEXT_SEARCH = "full_text_search"
 
   val supportedFnNames: Seq[String] =
-    Seq(INCREMENTAL_QUERY, INCREMENTAL_BETWEEN_TIMESTAMP, 
INCREMENTAL_TO_AUTO_TAG, VECTOR_SEARCH)
+    Seq(
+      INCREMENTAL_QUERY,
+      INCREMENTAL_BETWEEN_TIMESTAMP,
+      INCREMENTAL_TO_AUTO_TAG,
+      VECTOR_SEARCH,
+      FULL_TEXT_SEARCH)
 
   private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, 
TableFunctionBuilder)
 
@@ -60,6 +66,8 @@ object PaimonTableValuedFunctions {
         FunctionRegistryBase.build[IncrementalToAutoTag](fnName, since = None)
       case VECTOR_SEARCH =>
         FunctionRegistryBase.build[VectorSearchQuery](fnName, since = None)
+      case FULL_TEXT_SEARCH =>
+        FunctionRegistryBase.build[FullTextSearchQuery](fnName, since = None)
       case _ =>
         throw new Exception(s"Function $fnName isn't a supported table valued 
function.")
     }
@@ -90,10 +98,12 @@ object PaimonTableValuedFunctions {
     val ident: Identifier = Identifier.of(Array(dbName), tableName)
     val sparkTable = sparkCatalog.loadTable(ident)
 
-    // Handle vector_search specially
+    // Handle vector_search and full_text_search specially
     tvf match {
       case vsq: VectorSearchQuery =>
         resolveVectorSearchQuery(sparkTable, sparkCatalog, ident, vsq, 
args.tail)
+      case ftsq: FullTextSearchQuery =>
+        resolveFullTextSearchQuery(sparkTable, sparkCatalog, ident, ftsq, 
args.tail)
       case _ =>
         val options = tvf.parseArgs(args.tail)
         usingSparkIncrementQuery(tvf, sparkTable, options) match {
@@ -131,6 +141,28 @@ object PaimonTableValuedFunctions {
     }
   }
 
+  private def resolveFullTextSearchQuery(
+      sparkTable: Table,
+      sparkCatalog: TableCatalog,
+      ident: Identifier,
+      ftsq: FullTextSearchQuery,
+      argsWithoutTable: Seq[Expression]): LogicalPlan = {
+    sparkTable match {
+      case st @ SparkTable(innerTable: InnerTable) =>
+        val fullTextSearch = ftsq.createFullTextSearch(innerTable, 
argsWithoutTable)
+        val fullTextSearchTable = FullTextSearchTable.create(innerTable, 
fullTextSearch)
+        DataSourceV2Relation.create(
+          st.copy(table = fullTextSearchTable),
+          Some(sparkCatalog),
+          Some(ident),
+          CaseInsensitiveStringMap.empty())
+      case _ =>
+        throw new RuntimeException(
+          "full_text_search only supports Paimon SparkTable backed by 
InnerTable, " +
+            s"but got table implementation: ${sparkTable.getClass.getName}")
+    }
+  }
+
   private def usingSparkIncrementQuery(
       tvf: PaimonTableValueFunction,
       sparkTable: Table,
@@ -306,3 +338,52 @@ case class VectorSearchQuery(override val args: 
Seq[Expression])
     }
   }
 }
+
+/**
+ * Plan for the [[FULL_TEXT_SEARCH]] table-valued function.
+ *
+ * Usage: full_text_search(table_name, column_name, query_text, limit)
+ *   - table_name: the Paimon table to search
+ *   - column_name: the text column name
+ *   - query_text: the query text string
+ *   - limit: the number of top results to return
+ *
+ * Example: SELECT * FROM full_text_search('T', 'content', 'hello world', 10)
+ */
+case class FullTextSearchQuery(override val args: Seq[Expression])
+  extends PaimonTableValueFunction(FULL_TEXT_SEARCH) {
+
+  override def parseArgs(args: Seq[Expression]): Map[String, String] = {
+    // This method is not used for FullTextSearchQuery as we handle it 
specially
+    Map.empty
+  }
+
+  def createFullTextSearch(
+      innerTable: InnerTable,
+      argsWithoutTable: Seq[Expression]): FullTextSearch = {
+    if (argsWithoutTable.size != 3) {
+      throw new RuntimeException(
+        s"$FULL_TEXT_SEARCH needs three parameters after table_name: 
column_name, query_text, limit. " +
+          s"Got ${argsWithoutTable.size} parameters after table_name."
+      )
+    }
+    val columnName = argsWithoutTable.head.eval().toString
+    if (!innerTable.rowType().containsField(columnName)) {
+      throw new RuntimeException(
+        s"Column $columnName does not exist in table ${innerTable.name()}"
+      )
+    }
+    val queryText = argsWithoutTable(1).eval().toString
+    val limit = argsWithoutTable(2).eval() match {
+      case i: Int => i
+      case l: Long => l.toInt
+      case other => throw new RuntimeException(s"Invalid limit type: 
${other.getClass.getName}")
+    }
+    if (limit <= 0) {
+      throw new IllegalArgumentException(
+        s"Limit must be a positive integer, but got: $limit"
+      )
+    }
+    new FullTextSearch(queryText, limit, columnName)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 5260ef8e09..42f724d3e6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.read
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
+import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, 
VectorSearch}
 import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, 
PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, 
PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, 
SparkTypeUtils}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
@@ -51,6 +51,7 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
   def pushedLimit: Option[Int] = None
   def pushedTopN: Option[TopN] = None
   def pushedVectorSearch: Option[VectorSearch] = None
+  def pushedFullTextSearch: Option[FullTextSearch] = None
 
   // Runtime push down
   val pushedRuntimePartitionFilters: ListBuffer[PartitionPredicate] = 
ListBuffer.empty
@@ -187,6 +188,7 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
       pushedDataFiltersStr +
       pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") +
       pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("") +
-      pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("")
+      pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("") +
+      pushedFullTextSearch.map(fts => s", FullTextSearch: 
[$fts]").getOrElse("")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala
new file mode 100644
index 0000000000..8f6d06fd75
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for full-text search read/write operations using test-only 
brute-force full-text index. */
+class FullTextSearchTest extends PaimonSparkTestBase {
+
+  private val indexType = "test-fulltext"
+
+  // ========== Index Creation Tests ==========
+
+  test("create full-text index - basic") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, content STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(i => s"($i, 'document number $i about paimon lake format')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'content', index_type => '$indexType')")
+        .collect()
+        .head
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100L)
+    }
+  }
+
+  // ========== Index Read/Search Tests ==========
+
+  test("full-text search - basic search") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, content STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(i => s"($i, 'document number $i about paimon lake format')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'content', index_type => '$indexType')")
+        .collect()
+
+      val result = spark
+        .sql("""
+               |SELECT * FROM full_text_search('T', 'content', 'paimon', 5)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 5)
+    }
+  }
+
+  test("full-text search - top-k with different k values") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, content STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 200)
+        .map(i => s"($i, 'document number $i about paimon lake format')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'content', index_type => '$indexType')")
+        .collect()
+
+      // Test with k=1
+      var result = spark
+        .sql("""
+               |SELECT * FROM full_text_search('T', 'content', 'paimon', 1)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 1)
+
+      // Test with k=10
+      result = spark
+        .sql("""
+               |SELECT * FROM full_text_search('T', 'content', 'paimon', 10)
+               |""".stripMargin)
+        .collect()
+      assert(result.length == 10)
+    }
+  }
+
+  test("full-text search - multi-term query") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, content STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      spark.sql("""
+                  |INSERT INTO T VALUES
+                  |  (0, 'Apache Paimon lake format'),
+                  |  (1, 'Paimon supports full-text search'),
+                  |  (2, 'full-text search in Apache Paimon'),
+                  |  (3, 'vector similarity search'),
+                  |  (4, 'streaming batch processing')
+                  |""".stripMargin)
+
+      spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'content', index_type => '$indexType')")
+        .collect()
+
+      // Query "Paimon search" - rows 1 and 2 match both terms
+      val result = spark
+        .sql("""
+               |SELECT id FROM full_text_search('T', 'content', 'Paimon 
search', 2)
+               |ORDER BY id
+               |""".stripMargin)
+        .collect()
+
+      assert(result.length == 2)
+      val ids = result.map(_.getInt(0)).toSet
+      assert(ids.contains(1))
+      assert(ids.contains(2))
+    }
+  }
+
+  // ========== Integration Tests ==========
+
+  test("end-to-end: write, index, search cycle") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, title STRING, content STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 1000)
+        .map(i => s"($i, 'title_$i', 'document number $i about paimon lake 
format')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val indexResult = spark
+        .sql(
+          s"CALL sys.create_global_index(table => 'test.T', index_column => 
'content', index_type => '$indexType')")
+        .collect()
+        .head
+      assert(indexResult.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == indexType)
+      assert(indexEntries.nonEmpty)
+
+      val searchResult = spark
+        .sql("""
+               |SELECT id, title FROM full_text_search('T', 'content', 
'paimon', 10)
+               |""".stripMargin)
+        .collect()
+
+      assert(searchResult.length == 10)
+    }
+  }
+}

Reply via email to