This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e6231d01c6a828e7677073c821d79349f3be5dda Author: Jingsong Lee <[email protected]> AuthorDate: Mon Mar 30 14:34:30 2026 +0800 [core][spark] Introduce Full Text Search interfaces and Spark function (#7548) Symmetric implementation of full text search based on the existing VectorSearch pattern. --- .../paimon/globalindex/GlobalIndexReader.java | 5 + .../globalindex/OffsetGlobalIndexReader.java | 6 + .../apache/paimon/predicate/FullTextSearch.java | 72 ++++ .../TestFullTextGlobalIndexReader.java | 254 ++++++++++++ .../TestFullTextGlobalIndexWriter.java | 110 ++++++ .../testfulltext/TestFullTextGlobalIndexer.java | 62 +++ .../TestFullTextGlobalIndexerFactory.java | 37 +- ....apache.paimon.globalindex.GlobalIndexerFactory | 1 + .../java/org/apache/paimon/table/FormatTable.java | 7 + .../apache/paimon/table/FullTextSearchTable.java | 101 +++++ .../java/org/apache/paimon/table/InnerTable.java | 7 + .../main/java/org/apache/paimon/table/Table.java | 4 + .../apache/paimon/table/source/FullTextRead.java | 27 +- .../paimon/table/source/FullTextReadImpl.java | 127 ++++++ .../apache/paimon/table/source/FullTextScan.java | 26 +- .../paimon/table/source/FullTextScanImpl.java | 88 +++++ .../paimon/table/source/FullTextSearchBuilder.java | 47 +++ .../table/source/FullTextSearchBuilderImpl.java | 74 ++++ .../paimon/table/source/FullTextSearchSplit.java | 82 ++++ .../table/source/FullTextSearchBuilderTest.java | 424 +++++++++++++++++++++ .../paimon/spark/PaimonBaseScanBuilder.scala | 2 + .../scala/org/apache/paimon/spark/PaimonScan.scala | 3 +- .../scala/org/apache/paimon/spark/PaimonScan.scala | 3 +- .../org/apache/paimon/spark/PaimonBaseScan.scala | 28 +- .../paimon/spark/PaimonBaseScanBuilder.scala | 2 + .../scala/org/apache/paimon/spark/PaimonScan.scala | 3 +- .../apache/paimon/spark/PaimonScanBuilder.scala | 11 +- .../plans/logical/PaimonTableValuedFunctions.scala | 89 ++++- .../org/apache/paimon/spark/read/BaseScan.scala | 6 +- .../paimon/spark/sql/FullTextSearchTest.scala | 223 +++++++++++ 30 files changed, 1869 insertions(+), 62 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java index 1b8c17bf66..f639273575 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java @@ -18,6 +18,7 @@ package org.apache.paimon.globalindex; +import org.apache.paimon.predicate.FullTextSearch; import org.apache.paimon.predicate.FunctionVisitor; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.VectorSearch; @@ -47,4 +48,8 @@ public interface GlobalIndexReader extends FunctionVisitor<Optional<GlobalIndexR default Optional<ScoredGlobalIndexResult> visitVectorSearch(VectorSearch vectorSearch) { throw new UnsupportedOperationException(); } + + default Optional<ScoredGlobalIndexResult> visitFullTextSearch(FullTextSearch fullTextSearch) { + throw new UnsupportedOperationException(); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java index 21f06f59f3..0122060dfb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java @@ -19,6 +19,7 @@ package org.apache.paimon.globalindex; import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FullTextSearch; import org.apache.paimon.predicate.VectorSearch; import java.io.IOException; @@ -122,6 +123,11 @@ public class OffsetGlobalIndexReader implements GlobalIndexReader { .map(r -> r.offset(offset)); } + @Override + public Optional<ScoredGlobalIndexResult> visitFullTextSearch(FullTextSearch fullTextSearch) { + return wrapped.visitFullTextSearch(fullTextSearch).map(r -> r.offset(offset)); + } + private Optional<GlobalIndexResult> applyOffset(Optional<GlobalIndexResult> result) { return result.map(r -> r.offset(offset)); } diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java b/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java new file mode 100644 index 0000000000..5b91d0bbc6 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; + +import java.io.Serializable; +import java.util.Optional; + +/** FullTextSearch to perform full-text search on a text column. */ +public class FullTextSearch implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String queryText; + private final String fieldName; + private final int limit; + + public FullTextSearch(String queryText, int limit, String fieldName) { + if (queryText == null || queryText.isEmpty()) { + throw new IllegalArgumentException("Query text cannot be null or empty"); + } + if (limit <= 0) { + throw new IllegalArgumentException("Limit must be positive, got: " + limit); + } + if (fieldName == null || fieldName.isEmpty()) { + throw new IllegalArgumentException("Field name cannot be null or empty"); + } + this.queryText = queryText; + this.limit = limit; + this.fieldName = fieldName; + } + + public String queryText() { + return queryText; + } + + public int limit() { + return limit; + } + + public String fieldName() { + return fieldName; + } + + public Optional<ScoredGlobalIndexResult> visit(GlobalIndexReader visitor) { + return visitor.visitFullTextSearch(this); + } + + @Override + public String toString() { + return String.format( + "FullTextSearch{field=%s, query='%s', limit=%d}", fieldName, queryText, limit); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java new file mode 100644 index 0000000000..affc79e916 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.testfulltext; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; + +/** + * Test full-text index reader that performs brute-force text matching. Loads all documents into + * memory and scores them against the query using simple term-frequency matching. + * + * <p>Scoring: for each query term found in the document (case-insensitive), score += 1.0 / + * queryTermCount. A document that contains all query terms scores 1.0. + */ +public class TestFullTextGlobalIndexReader implements GlobalIndexReader { + + private final GlobalIndexFileReader fileReader; + private final GlobalIndexIOMeta ioMeta; + + private String[] documents; + private int count; + + public TestFullTextGlobalIndexReader( + GlobalIndexFileReader fileReader, GlobalIndexIOMeta ioMeta) { + this.fileReader = fileReader; + this.ioMeta = ioMeta; + } + + @Override + public Optional<ScoredGlobalIndexResult> visitFullTextSearch(FullTextSearch fullTextSearch) { + try { + ensureLoaded(); + } catch (IOException e) { + throw new RuntimeException("Failed to load test full-text index", e); + } + + String queryText = fullTextSearch.queryText(); + int limit = fullTextSearch.limit(); + int effectiveK = Math.min(limit, count); + if (effectiveK <= 0) { + return Optional.empty(); + } + + String[] queryTerms = queryText.toLowerCase(Locale.ROOT).split("\\s+"); + + // Min-heap: smallest score at head, so we evict the weakest candidate. + PriorityQueue<ScoredRow> topK = + new PriorityQueue<>(effectiveK + 1, Comparator.comparingDouble(s -> s.score)); + + for (int i = 0; i < count; i++) { + float score = computeScore(documents[i], queryTerms); + if (score <= 0) { + continue; + } + if (topK.size() < effectiveK) { + topK.offer(new ScoredRow(i, score)); + } else if (score > topK.peek().score) { + topK.poll(); + topK.offer(new ScoredRow(i, score)); + } + } + + if (topK.isEmpty()) { + return Optional.empty(); + } + + RoaringNavigableMap64 resultBitmap = new RoaringNavigableMap64(); + Map<Long, Float> scoreMap = new HashMap<>(topK.size()); + for (ScoredRow row : topK) { + resultBitmap.add(row.rowId); + scoreMap.put(row.rowId, row.score); + } + + return Optional.of(ScoredGlobalIndexResult.create(() -> resultBitmap, scoreMap::get)); + } + + private static float computeScore(String document, String[] queryTerms) { + String lowerDoc = document.toLowerCase(Locale.ROOT); + float score = 0; + for (String term : queryTerms) { + if (lowerDoc.contains(term)) { + score += 1.0f / queryTerms.length; + } + } + return score; + } + + private void ensureLoaded() throws IOException { + if (documents != null) { + return; + } + + try (SeekableInputStream in = fileReader.getInputStream(ioMeta)) { + // Read header: count (4 bytes) + byte[] headerBytes = new byte[4]; + readFully(in, headerBytes); + ByteBuffer header = ByteBuffer.wrap(headerBytes); + header.order(ByteOrder.LITTLE_ENDIAN); + count = header.getInt(); + + // Read documents + documents = new String[count]; + for (int i = 0; i < count; i++) { + byte[] lenBytes = new byte[4]; + readFully(in, lenBytes); + ByteBuffer lenBuf = ByteBuffer.wrap(lenBytes); + lenBuf.order(ByteOrder.LITTLE_ENDIAN); + int textLen = lenBuf.getInt(); + + byte[] textBytes = new byte[textLen]; + readFully(in, textBytes); + documents[i] = new String(textBytes, StandardCharsets.UTF_8); + } + } + } + + private static void readFully(SeekableInputStream in, byte[] buf) throws IOException { + int offset = 0; + while (offset < buf.length) { + int bytesRead = in.read(buf, offset, buf.length - offset); + if (bytesRead < 0) { + throw new IOException( + "Unexpected end of stream: read " + + offset + + " bytes but expected " + + buf.length); + } + offset += bytesRead; + } + } + + @Override + public void close() throws IOException { + documents = null; + } + + // =================== unsupported predicate operations ===================== + + @Override + public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> literals) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, List<Object> literals) { + return Optional.empty(); + } + + /** A row ID paired with its similarity score, used in the top-k min-heap. */ + private static class ScoredRow { + final long rowId; + final float score; + + ScoredRow(long rowId, float score) { + this.rowId = rowId; + this.score = score; + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java new file mode 100644 index 0000000000..cf8d2de24a --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.testfulltext; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Test full-text index writer that stores all text documents in memory and writes them to a single + * binary file on {@link #finish()}. + * + * <p>Binary format (all values little-endian): + * + * <pre> + * [4 bytes] count (int) + * For each document: + * [4 bytes] text length in bytes (int) + * [N bytes] UTF-8 text + * </pre> + */ +public class TestFullTextGlobalIndexWriter implements GlobalIndexSingletonWriter { + + private static final String FILE_NAME_PREFIX = "test-fulltext"; + + private final GlobalIndexFileWriter fileWriter; + private final List<String> documents; + + public TestFullTextGlobalIndexWriter(GlobalIndexFileWriter fileWriter) { + this.fileWriter = fileWriter; + this.documents = new ArrayList<>(); + } + + @Override + public void write(Object fieldData) { + if (fieldData == null) { + throw new IllegalArgumentException("Text field data must not be null"); + } + + String text; + if (fieldData instanceof String) { + text = (String) fieldData; + } else if (fieldData instanceof BinaryString) { + text = fieldData.toString(); + } else { + throw new IllegalArgumentException( + "Unsupported text type: " + fieldData.getClass().getName()); + } + documents.add(text); + } + + @Override + public List<ResultEntry> finish() { + if (documents.isEmpty()) { + return Collections.emptyList(); + } + + try { + String fileName = fileWriter.newFileName(FILE_NAME_PREFIX); + try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) { + // Header: count + ByteBuffer header = ByteBuffer.allocate(4); + header.order(ByteOrder.LITTLE_ENDIAN); + header.putInt(documents.size()); + out.write(header.array()); + + // Documents + for (String doc : documents) { + byte[] textBytes = doc.getBytes(StandardCharsets.UTF_8); + ByteBuffer lenBuf = ByteBuffer.allocate(4); + lenBuf.order(ByteOrder.LITTLE_ENDIAN); + lenBuf.putInt(textBytes.length); + out.write(lenBuf.array()); + out.write(textBytes); + } + out.flush(); + } + + return Collections.singletonList(new ResultEntry(fileName, documents.size(), null)); + } catch (IOException e) { + throw new RuntimeException("Failed to write test full-text index", e); + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java new file mode 100644 index 0000000000..d9d3d14ff7 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.testfulltext; + +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VarCharType; + +import java.io.IOException; +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A test-only {@link GlobalIndexer} for full-text search. Uses brute-force in-memory inverted index + * for text queries. No native library dependency required. + */ +public class TestFullTextGlobalIndexer implements GlobalIndexer { + + private final DataType fieldType; + + public TestFullTextGlobalIndexer(DataType fieldType, Options options) { + checkArgument( + fieldType instanceof VarCharType, + "TestFullTextGlobalIndexer only supports VARCHAR/STRING, but got: " + fieldType); + this.fieldType = fieldType; + } + + @Override + public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) throws IOException { + return new TestFullTextGlobalIndexWriter(fileWriter); + } + + @Override + public GlobalIndexReader createReader( + GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) throws IOException { + checkArgument(files.size() == 1, "Expected exactly one index file per shard"); + return new TestFullTextGlobalIndexReader(fileReader, files.get(0)); + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java similarity index 50% copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala copy to paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java index 2ad14bd055..b501c2df10 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexerFactory.java @@ -16,21 +16,28 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.globalindex.testfulltext; -import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} -import org.apache.paimon.table.InnerTable +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.GlobalIndexerFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; -import org.apache.spark.sql.types.StructType +/** + * A test-only {@link GlobalIndexerFactory} for full-text search. Uses brute-force in-memory + * inverted index, no native dependencies required. + */ +public class TestFullTextGlobalIndexerFactory implements GlobalIndexerFactory { + + public static final String IDENTIFIER = "test-fulltext"; + + @Override + public String identifier() { + return IDENTIFIER; + } -case class PaimonScan( - table: InnerTable, - requiredSchema: StructType, - pushedPartitionFilters: Seq[PartitionPredicate], - pushedDataFilters: Seq[Predicate], - override val pushedLimit: Option[Int] = None, - override val pushedTopN: Option[TopN] = None, - override val pushedVectorSearch: Option[VectorSearch] = None, - bucketedScanDisabled: Boolean = true) - extends PaimonBaseScan(table) {} + @Override + public GlobalIndexer create(DataField field, Options options) { + return new TestFullTextGlobalIndexer(field.type(), options); + } +} diff --git a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory index 3fa7826081..841301f1b2 100644 --- a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory +++ b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.paimon.globalindex.testvector.TestVectorGlobalIndexerFactory +org.apache.paimon.globalindex.testfulltext.TestFullTextGlobalIndexerFactory diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 8f298ff9bf..d7af8510a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.format.FormatBatchWriteBuilder; import org.apache.paimon.table.format.FormatReadBuilder; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.FullTextSearchBuilder; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.VectorSearchBuilder; import org.apache.paimon.types.RowType; @@ -278,6 +279,12 @@ public interface FormatTable extends Table { throw new UnsupportedOperationException("FormatTable does not support vector search."); } + @Override + public FullTextSearchBuilder newFullTextSearchBuilder() { + throw new UnsupportedOperationException( + "FormatTable does not support full-text search."); + } + @Override public CatalogContext catalogContext() { return this.catalogContext; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java new file mode 100644 index 0000000000..efe4a9f3d9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/FullTextSearchTable.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.types.RowType; + +import java.util.List; +import java.util.Map; + +/** + * A table wrapper to hold full-text search information. This is used to pass full-text search + * pushdown information from logical plan optimization to physical plan execution. For now, it is + * only used by internal for Spark engine. + */ +public class FullTextSearchTable implements ReadonlyTable { + + private final InnerTable origin; + private final FullTextSearch fullTextSearch; + + private FullTextSearchTable(InnerTable origin, FullTextSearch fullTextSearch) { + this.origin = origin; + this.fullTextSearch = fullTextSearch; + } + + public static FullTextSearchTable create(InnerTable origin, FullTextSearch fullTextSearch) { + return new FullTextSearchTable(origin, fullTextSearch); + } + + public FullTextSearch fullTextSearch() { + return fullTextSearch; + } + + public InnerTable origin() { + return origin; + } + + @Override + public String name() { + return origin.name(); + } + + @Override + public RowType rowType() { + return origin.rowType(); + } + + @Override + public List<String> primaryKeys() { + return origin.primaryKeys(); + } + + @Override + public List<String> partitionKeys() { + return origin.partitionKeys(); + } + + @Override + public Map<String, String> options() { + return origin.options(); + } + + @Override + public FileIO fileIO() { + return origin.fileIO(); + } + + @Override + public InnerTableRead newRead() { + return origin.newRead(); + } + + @Override + public InnerTableScan newScan() { + throw new UnsupportedOperationException(); + } + + @Override + public Table copy(Map<String, String> dynamicOptions) { + return new FullTextSearchTable((InnerTable) origin.copy(dynamicOptions), fullTextSearch); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java index c404672e43..d360597744 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/InnerTable.java @@ -25,6 +25,8 @@ import org.apache.paimon.table.sink.InnerTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.sink.StreamWriteBuilderImpl; import org.apache.paimon.table.sink.WriteSelector; +import org.apache.paimon.table.source.FullTextSearchBuilder; +import org.apache.paimon.table.source.FullTextSearchBuilderImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadBuilder; @@ -60,6 +62,11 @@ public interface InnerTable extends Table { return new VectorSearchBuilderImpl(this); } + @Override + default FullTextSearchBuilder newFullTextSearchBuilder() { + return new FullTextSearchBuilderImpl(this); + } + @Override default BatchWriteBuilder newBatchWriteBuilder() { return new BatchWriteBuilderImpl(this); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 938718db98..13d56d6849 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.FullTextSearchBuilder; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.VectorSearchBuilder; import org.apache.paimon.types.RowType; @@ -217,6 +218,9 @@ public interface Table extends Serializable { /** Returns a new vector search builder. */ VectorSearchBuilder newVectorSearchBuilder(); + /** Returns a new full-text search builder. */ + FullTextSearchBuilder newFullTextSearchBuilder(); + /** Returns a new read builder. */ ReadBuilder newReadBuilder(); diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java similarity index 55% copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala copy to paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java index 2ad14bd055..77cc04b16d 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextRead.java @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.table.source; -import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} -import org.apache.paimon.table.InnerTable +import org.apache.paimon.globalindex.GlobalIndexResult; -import org.apache.spark.sql.types.StructType +import java.util.List; -case class PaimonScan( - table: InnerTable, - requiredSchema: StructType, - pushedPartitionFilters: Seq[PartitionPredicate], - pushedDataFilters: Seq[Predicate], - override val pushedLimit: Option[Int] = None, - override val pushedTopN: Option[TopN] = None, - override val pushedVectorSearch: Option[VectorSearch] = None, - bucketedScanDisabled: Boolean = true) - extends PaimonBaseScan(table) {} +/** Full-text read to read index files. */ +public interface FullTextRead { + + default GlobalIndexResult read(FullTextScan.Plan plan) { + return read(plan.splits()); + } + + GlobalIndexResult read(List<FullTextSearchSplit> splits); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java new file mode 100644 index 0000000000..f58f8f26ea --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; +import org.apache.paimon.globalindex.OffsetGlobalIndexReader; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.index.GlobalIndexMeta; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import static java.util.Collections.singletonList; +import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** Implementation for {@link FullTextRead}. */ +public class FullTextReadImpl implements FullTextRead { + + private final FileStoreTable table; + private final int limit; + private final DataField textColumn; + private final String queryText; + + public FullTextReadImpl( + FileStoreTable table, int limit, DataField textColumn, String queryText) { + this.table = table; + this.limit = limit; + this.textColumn = textColumn; + this.queryText = queryText; + } + + @Override + public GlobalIndexResult read(List<FullTextSearchSplit> splits) { + if (splits.isEmpty()) { + return GlobalIndexResult.createEmpty(); + } + + Integer threadNum = table.coreOptions().globalIndexThreadNum(); + + String indexType = splits.get(0).fullTextIndexFiles().get(0).indexType(); + GlobalIndexer globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(textColumn, table.coreOptions().toConfiguration()); + IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); + Iterator<Optional<ScoredGlobalIndexResult>> resultIterators = + randomlyExecuteSequentialReturn( + split -> + singletonList( + eval( + globalIndexer, + indexPathFactory, + split.rowRangeStart(), + split.rowRangeEnd(), + split.fullTextIndexFiles())), + splits, + threadNum); + + ScoredGlobalIndexResult result = ScoredGlobalIndexResult.createEmpty(); + while (resultIterators.hasNext()) { + Optional<ScoredGlobalIndexResult> next = resultIterators.next(); + if (next.isPresent()) { + result = result.or(next.get()); + } + } + + return result.topK(limit); + } + + private Optional<ScoredGlobalIndexResult> eval( + GlobalIndexer globalIndexer, + IndexPathFactory indexPathFactory, + long rowRangeStart, + long rowRangeEnd, + List<IndexFileMeta> fullTextIndexFiles) { + List<GlobalIndexIOMeta> indexIOMetaList = new ArrayList<>(); + for (IndexFileMeta indexFile : fullTextIndexFiles) { + GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); + indexIOMetaList.add( + new GlobalIndexIOMeta( + indexPathFactory.toPath(indexFile), + indexFile.fileSize(), + meta.indexMeta())); + } + @SuppressWarnings("resource") + FileIO fileIO = table.fileIO(); + GlobalIndexFileReader indexFileReader = m -> fileIO.newInputStream(m.filePath()); + try (GlobalIndexReader reader = + globalIndexer.createReader(indexFileReader, indexIOMetaList)) { + FullTextSearch fullTextSearch = new FullTextSearch(queryText, limit, textColumn.name()); + return new OffsetGlobalIndexReader(reader, rowRangeStart, rowRangeEnd) + .visitFullTextSearch(fullTextSearch); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java similarity index 55% copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala copy to paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java index 2ad14bd055..563ffe1c9d 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScan.java @@ -16,21 +16,17 @@ * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.table.source; -import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} -import org.apache.paimon.table.InnerTable +import java.util.List; -import org.apache.spark.sql.types.StructType +/** Full-text scan to scan index files. */ +public interface FullTextScan { -case class PaimonScan( - table: InnerTable, - requiredSchema: StructType, - pushedPartitionFilters: Seq[PartitionPredicate], - pushedDataFilters: Seq[Predicate], - override val pushedLimit: Option[Int] = None, - override val pushedTopN: Option[TopN] = None, - override val pushedVectorSearch: Option[VectorSearch] = None, - bucketedScanDisabled: Boolean = true) - extends PaimonBaseScan(table) {} + Plan scan(); + + /** Plan of full-text scan. */ + interface Plan { + List<FullTextSearchSplit> splits(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java new file mode 100644 index 0000000000..cc77d9121a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.index.GlobalIndexMeta; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; +import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Range; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** Implementation for {@link FullTextScan}. */ +public class FullTextScanImpl implements FullTextScan { + + private final FileStoreTable table; + private final DataField textColumn; + + public FullTextScanImpl(FileStoreTable table, DataField textColumn) { + this.table = table; + this.textColumn = textColumn; + } + + @Override + public Plan scan() { + Objects.requireNonNull(textColumn, "Text column must be set"); + + Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table); + IndexFileHandler indexFileHandler = table.store().newIndexFileHandler(); + Filter<IndexManifestEntry> indexFileFilter = + entry -> { + GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta(); + if (globalIndex == null) { + return false; + } + return textColumn.id() == globalIndex.indexFieldId(); + }; + + List<IndexFileMeta> allIndexFiles = + indexFileHandler.scan(snapshot, indexFileFilter).stream() + .map(IndexManifestEntry::indexFile) + .collect(Collectors.toList()); + + // Group full-text index files by (rowRangeStart, rowRangeEnd) + Map<Range, List<IndexFileMeta>> byRange = new HashMap<>(); + for (IndexFileMeta indexFile : allIndexFiles) { + GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); + Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); + byRange.computeIfAbsent(range, k -> new ArrayList<>()).add(indexFile); + } + + List<FullTextSearchSplit> splits = new ArrayList<>(); + for (Map.Entry<Range, List<IndexFileMeta>> entry : byRange.entrySet()) { + Range range = entry.getKey(); + splits.add(new FullTextSearchSplit(range.from, range.to, entry.getValue())); + } + + return () -> splits; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java new file mode 100644 index 0000000000..731962d0cf --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.globalindex.GlobalIndexResult; + +import java.io.Serializable; + +/** Builder to build full-text search. */ +public interface FullTextSearchBuilder extends Serializable { + + /** The top k results to return. */ + FullTextSearchBuilder withLimit(int limit); + + /** The text column to search. */ + FullTextSearchBuilder withTextColumn(String name); + + /** The query text to search. */ + FullTextSearchBuilder withQueryText(String queryText); + + /** Create full-text scan to scan index files. */ + FullTextScan newFullTextScan(); + + /** Create full-text read to read index files. */ + FullTextRead newFullTextRead(); + + /** Execute full-text index search in local. */ + default GlobalIndexResult executeLocal() { + return newFullTextRead().read(newFullTextScan().scan()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java new file mode 100644 index 0000000000..1165fcf205 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.InnerTable; +import org.apache.paimon.types.DataField; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** Implementation for {@link FullTextSearchBuilder}. */ +public class FullTextSearchBuilderImpl implements FullTextSearchBuilder { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + + private int limit; + private DataField textColumn; + private String queryText; + + public FullTextSearchBuilderImpl(InnerTable table) { + this.table = (FileStoreTable) table; + } + + @Override + public FullTextSearchBuilder withLimit(int limit) { + this.limit = limit; + return this; + } + + @Override + public FullTextSearchBuilder withTextColumn(String name) { + this.textColumn = table.rowType().getField(name); + return this; + } + + @Override + public FullTextSearchBuilder withQueryText(String queryText) { + this.queryText = queryText; + return this; + } + + @Override + public FullTextScan newFullTextScan() { + checkNotNull(textColumn, "Text column must be set via withTextColumn()"); + return new FullTextScanImpl(table, textColumn); + } + + @Override + public FullTextRead newFullTextRead() { + checkArgument(limit > 0, "Limit must be positive, set via withLimit()"); + checkNotNull(textColumn, "Text column must be set via withTextColumn()"); + checkNotNull(queryText, "Query text must be set via withQueryText()"); + return new FullTextReadImpl(table, limit, textColumn, queryText); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java new file mode 100644 index 0000000000..d230482186 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchSplit.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.index.IndexFileMeta; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; + +/** Split of full-text search. */ +public class FullTextSearchSplit implements Serializable { + + private static final long serialVersionUID = 1L; + + private final long rowRangeStart; + private final long rowRangeEnd; + private final List<IndexFileMeta> fullTextIndexFiles; + + public FullTextSearchSplit( + long rowRangeStart, long rowRangeEnd, List<IndexFileMeta> fullTextIndexFiles) { + this.rowRangeStart = rowRangeStart; + this.rowRangeEnd = rowRangeEnd; + this.fullTextIndexFiles = fullTextIndexFiles; + } + + public long rowRangeStart() { + return rowRangeStart; + } + + public long rowRangeEnd() { + return rowRangeEnd; + } + + public List<IndexFileMeta> fullTextIndexFiles() { + return fullTextIndexFiles; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + FullTextSearchSplit that = (FullTextSearchSplit) o; + return rowRangeStart == that.rowRangeStart + && rowRangeEnd == that.rowRangeEnd + && Objects.equals(fullTextIndexFiles, that.fullTextIndexFiles); + } + + @Override + public int hashCode() { + return Objects.hash(rowRangeStart, rowRangeEnd, fullTextIndexFiles); + } + + @Override + public String toString() { + return "FullTextSearchSplit{" + + "rowRangeStart=" + + rowRangeStart + + ", rowRangeEnd=" + + rowRangeEnd + + ", fullTextIndexFiles=" + + fullTextIndexFiles + + '}'; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java new file mode 100644 index 0000000000..c52a2a46af --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/FullTextSearchBuilderTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.testfulltext.TestFullTextGlobalIndexerFactory; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FullTextSearchBuilder} using test-only brute-force full-text index. */ +public class FullTextSearchBuilderTest extends TableTestBase { + + private static final String TEXT_FIELD_NAME = "content"; + + @Override + protected Schema schemaDefault() { + return Schema.newBuilder() + .column("id", DataTypes.INT()) + .column(TEXT_FIELD_NAME, DataTypes.STRING()) + .option(CoreOptions.BUCKET.key(), "-1") + .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true") + .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true") + .build(); + } + + @Test + public void testFullTextSearchEndToEnd() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] documents = { + "Apache Paimon is a lake format", + "Paimon supports full-text search", + "Vector search is also supported", + "Paimon provides streaming and batch processing", + "Full-text indexing enables fast text queries", + "The lake format supports ACID transactions" + }; + + writeDocuments(table, documents); + buildAndCommitIndex(table, documents); + + // Query "Paimon" - should match rows 0, 1, 3 + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("Paimon") + .withLimit(3) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class); + assertThat(result.results().isEmpty()).isFalse(); + + // Read using the search result + ReadBuilder readBuilder = table.newReadBuilder(); + List<Integer> ids = new ArrayList<>(); + TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> ids.add(row.getInt(0))); + } + + assertThat(ids).isNotEmpty(); + assertThat(ids.size()).isLessThanOrEqualTo(3); + // Rows 0, 1, 3 contain "Paimon" + assertThat(ids).containsAnyOf(0, 1, 3); + } + + @Test + public void testFullTextSearchMultiTermQuery() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] documents = { + "Apache Paimon lake format", + "Paimon full-text search support", + "full-text search in Apache Paimon", + "Vector search capabilities", + }; + + writeDocuments(table, documents); + buildAndCommitIndex(table, documents); + + // Query "Paimon search" - row 2 matches both terms, rows 1 matches both, row 0 matches one + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("Paimon search") + .withLimit(2) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class); + assertThat(result.results().isEmpty()).isFalse(); + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); + List<Integer> ids = new ArrayList<>(); + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> ids.add(row.getInt(0))); + } + + assertThat(ids).hasSize(2); + // Rows 1 and 2 contain both "Paimon" and "search" + assertThat(ids).contains(1, 2); + } + + @Test + public void testFullTextSearchEmptyResult() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + // Write data but no index - should return empty result + String[] documents = {"hello world", "foo bar"}; + writeDocuments(table, documents); + + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("nonexistent") + .withLimit(1) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result.results().isEmpty()).isTrue(); + } + + @Test + public void testFullTextSearchTopKLimit() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] documents = new String[20]; + for (int i = 0; i < 20; i++) { + documents[i] = "document number " + i + " with common keyword"; + } + + writeDocuments(table, documents); + buildAndCommitIndex(table, documents); + + // Search with limit=5 + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("keyword") + .withLimit(5) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); + List<Integer> ids = new ArrayList<>(); + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> ids.add(row.getInt(0))); + } + + assertThat(ids.size()).isLessThanOrEqualTo(5); + } + + @Test + public void testFullTextSearchWithMultipleIndexFiles() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] allDocuments = { + "Apache Paimon lake format", // row 0 + "Paimon supports streaming", // row 1 + "batch processing engine", // row 2 + "Paimon full-text search", // row 3 + "vector similarity search", // row 4 + "Paimon ACID transactions" // row 5 + }; + + writeDocuments(table, allDocuments); + + // Build two separate index files covering different row ranges + buildAndCommitMultipleIndexFiles(table, allDocuments); + + // Query "Paimon" - results should span across both index files + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("Paimon") + .withLimit(4) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class); + assertThat(result.results().isEmpty()).isFalse(); + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); + List<Integer> ids = new ArrayList<>(); + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> ids.add(row.getInt(0))); + } + + assertThat(ids).isNotEmpty(); + // Rows 0,1 are in first index file, rows 3,5 are in second + assertThat(ids).containsAnyOf(0, 1); + assertThat(ids).containsAnyOf(3, 5); + } + + @Test + public void testFullTextSearchNoMatchingDocuments() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] documents = { + "Apache Paimon lake format", "streaming batch processing", "ACID transactions support" + }; + + writeDocuments(table, documents); + buildAndCommitIndex(table, documents); + + // Query a term that doesn't exist in any document + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("nonexistent") + .withLimit(3) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result.results().isEmpty()).isTrue(); + } + + @Test + public void testFullTextSearchCaseInsensitive() throws Exception { + createTableDefault(); + FileStoreTable table = getTableDefault(); + + String[] documents = { + "Apache PAIMON Lake Format", "paimon supports search", "Paimon Is Great" + }; + + writeDocuments(table, documents); + buildAndCommitIndex(table, documents); + + // Query lowercase "paimon" should match all three (case-insensitive) + GlobalIndexResult result = + table.newFullTextSearchBuilder() + .withQueryText("paimon") + .withLimit(3) + .withTextColumn(TEXT_FIELD_NAME) + .executeLocal(); + + assertThat(result).isInstanceOf(ScoredGlobalIndexResult.class); + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); + List<Integer> ids = new ArrayList<>(); + try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> ids.add(row.getInt(0))); + } + + assertThat(ids).hasSize(3); + assertThat(ids).contains(0, 1, 2); + } + + // ====================== Helper methods ====================== + + private void writeDocuments(FileStoreTable table, String[] documents) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (int i = 0; i < documents.length; i++) { + write.write(GenericRow.of(i, BinaryString.fromString(documents[i]))); + } + commit.commit(write.prepareCommit()); + } + } + + private void buildAndCommitIndex(FileStoreTable table, String[] documents) throws Exception { + Options options = table.coreOptions().toConfiguration(); + DataField textField = table.rowType().getField(TEXT_FIELD_NAME); + + GlobalIndexSingletonWriter writer = + (GlobalIndexSingletonWriter) + GlobalIndexBuilderUtils.createIndexWriter( + table, + TestFullTextGlobalIndexerFactory.IDENTIFIER, + textField, + options); + for (String doc : documents) { + writer.write(doc); + } + List<ResultEntry> entries = writer.finish(); + + Range rowRange = new Range(0, documents.length - 1); + List<IndexFileMeta> indexFiles = + GlobalIndexBuilderUtils.toIndexFileMetas( + table.fileIO(), + table.store().pathFactory().globalIndexFileFactory(), + table.coreOptions(), + rowRange, + textField.id(), + TestFullTextGlobalIndexerFactory.IDENTIFIER, + entries); + + DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles); + CommitMessage message = + new CommitMessageImpl( + BinaryRow.EMPTY_ROW, + 0, + null, + dataIncrement, + CompactIncrement.emptyIncrement()); + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { + commit.commit(Collections.singletonList(message)); + } + } + + private void buildAndCommitMultipleIndexFiles(FileStoreTable table, String[] documents) + throws Exception { + Options options = table.coreOptions().toConfiguration(); + DataField textField = table.rowType().getField(TEXT_FIELD_NAME); + int mid = documents.length / 2; + + // Build first index file covering rows [0, mid) + GlobalIndexSingletonWriter writer1 = + (GlobalIndexSingletonWriter) + GlobalIndexBuilderUtils.createIndexWriter( + table, + TestFullTextGlobalIndexerFactory.IDENTIFIER, + textField, + options); + for (int i = 0; i < mid; i++) { + writer1.write(documents[i]); + } + List<ResultEntry> entries1 = writer1.finish(); + Range rowRange1 = new Range(0, mid - 1); + List<IndexFileMeta> indexFiles1 = + GlobalIndexBuilderUtils.toIndexFileMetas( + table.fileIO(), + table.store().pathFactory().globalIndexFileFactory(), + table.coreOptions(), + rowRange1, + textField.id(), + TestFullTextGlobalIndexerFactory.IDENTIFIER, + entries1); + + // Build second index file covering rows [mid, end) + GlobalIndexSingletonWriter writer2 = + (GlobalIndexSingletonWriter) + GlobalIndexBuilderUtils.createIndexWriter( + table, + TestFullTextGlobalIndexerFactory.IDENTIFIER, + textField, + options); + for (int i = mid; i < documents.length; i++) { + writer2.write(documents[i]); + } + List<ResultEntry> entries2 = writer2.finish(); + Range rowRange2 = new Range(mid, documents.length - 1); + List<IndexFileMeta> indexFiles2 = + GlobalIndexBuilderUtils.toIndexFileMetas( + table.fileIO(), + table.store().pathFactory().globalIndexFileFactory(), + table.coreOptions(), + rowRange2, + textField.id(), + TestFullTextGlobalIndexerFactory.IDENTIFIER, + entries2); + + // Combine all index files and commit together + List<IndexFileMeta> allIndexFiles = new ArrayList<>(); + allIndexFiles.addAll(indexFiles1); + allIndexFiles.addAll(indexFiles2); + + DataIncrement dataIncrement = DataIncrement.indexIncrement(allIndexFiles); + CommitMessage message = + new CommitMessageImpl( + BinaryRow.EMPTY_ROW, + 0, + null, + dataIncrement, + CompactIncrement.emptyIncrement()); + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { + commit.commit(Collections.singletonList(message)); + } + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 4f5451c95d..f6e3b97c81 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.FullTextSearch import org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking import org.apache.paimon.table.Table import org.apache.paimon.types.RowType @@ -53,6 +54,7 @@ abstract class PaimonBaseScanBuilder protected var pushedLimit: Option[Int] = None protected var pushedTopN: Option[TopN] = None protected var pushedVectorSearch: Option[VectorSearch] = None + protected var pushedFullTextSearch: Option[FullTextSearch] = None protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType()) diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index 2ad14bd055..f5a5c8e95e 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearch} import org.apache.paimon.table.InnerTable import org.apache.spark.sql.types.StructType @@ -32,5 +32,6 @@ case class PaimonScan( override val pushedLimit: Option[Int] = None, override val pushedTopN: Option[TopN] = None, override val pushedVectorSearch: Option[VectorSearch] = None, + override val pushedFullTextSearch: Option[FullTextSearch] = None, bucketedScanDisabled: Boolean = true) extends PaimonBaseScan(table) {} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index 1da9b17a81..707ee13459 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearch} import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable} import org.apache.paimon.table.source.{DataSplit, Split} @@ -36,6 +36,7 @@ case class PaimonScan( override val pushedLimit: Option[Int], override val pushedTopN: Option[TopN], override val pushedVectorSearch: Option[VectorSearch], + override val pushedFullTextSearch: Option[FullTextSearch] = None, bucketedScanDisabled: Boolean = false) extends PaimonBaseScan(table) with SupportsReportPartitioning { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index bd884d64b1..30a1621925 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -45,7 +45,7 @@ abstract class PaimonBaseScan(table: InnerTable) protected def getInputSplits: Array[Split] = { readBuilder .newScan() - .withGlobalIndexResult(evalVectorSearch()) + .withGlobalIndexResult(evalGlobalIndexSearch()) .asInstanceOf[InnerTableScan] .withMetricRegistry(paimonMetricsRegistry) .plan() @@ -54,11 +54,21 @@ abstract class PaimonBaseScan(table: InnerTable) .toArray } - private def evalVectorSearch(): GlobalIndexResult = { - if (pushedVectorSearch.isEmpty) { - return null + private def evalGlobalIndexSearch(): GlobalIndexResult = { + if (pushedVectorSearch.isDefined && pushedFullTextSearch.isDefined) { + throw new UnsupportedOperationException( + "Cannot push down both vector search and full-text search simultaneously.") + } + if (pushedVectorSearch.isDefined) { + return evalVectorSearch() } + if (pushedFullTextSearch.isDefined) { + return evalFullTextSearch() + } + null + } + private def evalVectorSearch(): GlobalIndexResult = { val vectorSearch = pushedVectorSearch.get val vectorBuilder = table .newVectorSearchBuilder() @@ -74,6 +84,16 @@ abstract class PaimonBaseScan(table: InnerTable) vectorBuilder.newVectorRead().read(vectorBuilder.newVectorScan().scan()) } + private def evalFullTextSearch(): GlobalIndexResult = { + val fullTextSearch = pushedFullTextSearch.get + val ftBuilder = table + .newFullTextSearchBuilder() + .withQueryText(fullTextSearch.queryText()) + .withTextColumn(fullTextSearch.fieldName()) + .withLimit(fullTextSearch.limit()) + ftBuilder.newFullTextRead().read(ftBuilder.newFullTextScan().scan()) + } + override def toBatch: Batch = { ensureNoFullScan() super.toBatch diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 47723171e4..14a090ff14 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.FullTextSearch import org.apache.paimon.table.{SpecialFields, Table} import org.apache.paimon.types.RowType @@ -53,6 +54,7 @@ abstract class PaimonBaseScanBuilder protected var pushedLimit: Option[Int] = None protected var pushedTopN: Option[TopN] = None protected var pushedVectorSearch: Option[VectorSearch] = None + protected var pushedFullTextSearch: Option[FullTextSearch] = None protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType()) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala index 12471f5787..0fa4d73054 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions.BucketFunctionType import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearch} import org.apache.paimon.spark.commands.BucketExpression.quote import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable} import org.apache.paimon.table.source.{DataSplit, Split} @@ -42,6 +42,7 @@ case class PaimonScan( override val pushedLimit: Option[Int], override val pushedTopN: Option[TopN], override val pushedVectorSearch: Option[VectorSearch], + override val pushedFullTextSearch: Option[FullTextSearch] = None, bucketedScanDisabled: Boolean = false) extends PaimonBaseScan(table) with SupportsReportPartitioning diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala index 2bf7a8ddd4..7e1989283b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala @@ -128,7 +128,7 @@ class PaimonScanBuilder(val table: InnerTable) localScan match { case Some(scan) => scan case None => - val (actualTable, vectorSearch) = table match { + val (actualTable, vectorSearch, fullTextSearch) = table match { case vst: org.apache.paimon.table.VectorSearchTable => val tableVectorSearch = Option(vst.vectorSearch()) val vs = (tableVectorSearch, pushedVectorSearch) match { @@ -136,8 +136,10 @@ class PaimonScanBuilder(val table: InnerTable) case (None, Some(_)) => pushedVectorSearch case (None, None) => None } - (vst.origin(), vs) - case _ => (table, pushedVectorSearch) + (vst.origin(), vs, None) + case ftst: org.apache.paimon.table.FullTextSearchTable => + (ftst.origin(), None, Option(ftst.fullTextSearch())) + case _ => (table, pushedVectorSearch, pushedFullTextSearch) } PaimonScan( @@ -147,7 +149,8 @@ class PaimonScanBuilder(val table: InnerTable) pushedDataFilters, pushedLimit, pushedTopN, - vectorSearch) + vectorSearch, + fullTextSearch) } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 6bb6004db8..d570f60303 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -19,10 +19,10 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions -import org.apache.paimon.predicate.VectorSearch +import org.apache.paimon.predicate.{FullTextSearch, VectorSearch} import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._ -import org.apache.paimon.table.{DataTable, InnerTable, VectorSearchTable} +import org.apache.paimon.table.{DataTable, FullTextSearchTable, InnerTable, VectorSearchTable} import org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException import org.apache.spark.sql.PaimonUtils.createDataset @@ -44,9 +44,15 @@ object PaimonTableValuedFunctions { val INCREMENTAL_BETWEEN_TIMESTAMP = "paimon_incremental_between_timestamp" val INCREMENTAL_TO_AUTO_TAG = "paimon_incremental_to_auto_tag" val VECTOR_SEARCH = "vector_search" + val FULL_TEXT_SEARCH = "full_text_search" val supportedFnNames: Seq[String] = - Seq(INCREMENTAL_QUERY, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_TO_AUTO_TAG, VECTOR_SEARCH) + Seq( + INCREMENTAL_QUERY, + INCREMENTAL_BETWEEN_TIMESTAMP, + INCREMENTAL_TO_AUTO_TAG, + VECTOR_SEARCH, + FULL_TEXT_SEARCH) private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) @@ -60,6 +66,8 @@ object PaimonTableValuedFunctions { FunctionRegistryBase.build[IncrementalToAutoTag](fnName, since = None) case VECTOR_SEARCH => FunctionRegistryBase.build[VectorSearchQuery](fnName, since = None) + case FULL_TEXT_SEARCH => + FunctionRegistryBase.build[FullTextSearchQuery](fnName, since = None) case _ => throw new Exception(s"Function $fnName isn't a supported table valued function.") } @@ -90,10 +98,12 @@ object PaimonTableValuedFunctions { val ident: Identifier = Identifier.of(Array(dbName), tableName) val sparkTable = sparkCatalog.loadTable(ident) - // Handle vector_search specially + // Handle vector_search and full_text_search specially tvf match { case vsq: VectorSearchQuery => resolveVectorSearchQuery(sparkTable, sparkCatalog, ident, vsq, args.tail) + case ftsq: FullTextSearchQuery => + resolveFullTextSearchQuery(sparkTable, sparkCatalog, ident, ftsq, args.tail) case _ => val options = tvf.parseArgs(args.tail) usingSparkIncrementQuery(tvf, sparkTable, options) match { @@ -131,6 +141,28 @@ object PaimonTableValuedFunctions { } } + private def resolveFullTextSearchQuery( + sparkTable: Table, + sparkCatalog: TableCatalog, + ident: Identifier, + ftsq: FullTextSearchQuery, + argsWithoutTable: Seq[Expression]): LogicalPlan = { + sparkTable match { + case st @ SparkTable(innerTable: InnerTable) => + val fullTextSearch = ftsq.createFullTextSearch(innerTable, argsWithoutTable) + val fullTextSearchTable = FullTextSearchTable.create(innerTable, fullTextSearch) + DataSourceV2Relation.create( + st.copy(table = fullTextSearchTable), + Some(sparkCatalog), + Some(ident), + CaseInsensitiveStringMap.empty()) + case _ => + throw new RuntimeException( + "full_text_search only supports Paimon SparkTable backed by InnerTable, " + + s"but got table implementation: ${sparkTable.getClass.getName}") + } + } + private def usingSparkIncrementQuery( tvf: PaimonTableValueFunction, sparkTable: Table, @@ -306,3 +338,52 @@ case class VectorSearchQuery(override val args: Seq[Expression]) } } } + +/** + * Plan for the [[FULL_TEXT_SEARCH]] table-valued function. + * + * Usage: full_text_search(table_name, column_name, query_text, limit) + * - table_name: the Paimon table to search + * - column_name: the text column name + * - query_text: the query text string + * - limit: the number of top results to return + * + * Example: SELECT * FROM full_text_search('T', 'content', 'hello world', 10) + */ +case class FullTextSearchQuery(override val args: Seq[Expression]) + extends PaimonTableValueFunction(FULL_TEXT_SEARCH) { + + override def parseArgs(args: Seq[Expression]): Map[String, String] = { + // This method is not used for FullTextSearchQuery as we handle it specially + Map.empty + } + + def createFullTextSearch( + innerTable: InnerTable, + argsWithoutTable: Seq[Expression]): FullTextSearch = { + if (argsWithoutTable.size != 3) { + throw new RuntimeException( + s"$FULL_TEXT_SEARCH needs three parameters after table_name: column_name, query_text, limit. " + + s"Got ${argsWithoutTable.size} parameters after table_name." + ) + } + val columnName = argsWithoutTable.head.eval().toString + if (!innerTable.rowType().containsField(columnName)) { + throw new RuntimeException( + s"Column $columnName does not exist in table ${innerTable.name()}" + ) + } + val queryText = argsWithoutTable(1).eval().toString + val limit = argsWithoutTable(2).eval() match { + case i: Int => i + case l: Long => l.toInt + case other => throw new RuntimeException(s"Invalid limit type: ${other.getClass.getName}") + } + if (limit <= 0) { + throw new IllegalArgumentException( + s"Limit must be a positive integer, but got: $limit" + ) + } + new FullTextSearch(queryText, limit, columnName) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala index 5260ef8e09..42f724d3e6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.read import org.apache.paimon.CoreOptions import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.{FullTextSearch, Predicate, TopN, VectorSearch} import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, SparkTypeUtils} import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.spark.schema.PaimonMetadataColumn._ @@ -51,6 +51,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging { def pushedLimit: Option[Int] = None def pushedTopN: Option[TopN] = None def pushedVectorSearch: Option[VectorSearch] = None + def pushedFullTextSearch: Option[FullTextSearch] = None // Runtime push down val pushedRuntimePartitionFilters: ListBuffer[PartitionPredicate] = ListBuffer.empty @@ -187,6 +188,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging { pushedDataFiltersStr + pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") + pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("") + - pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("") + pushedVectorSearch.map(vs => s", VectorSearch: [$vs]").getOrElse("") + + pushedFullTextSearch.map(fts => s", FullTextSearch: [$fts]").getOrElse("") } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala new file mode 100644 index 0000000000..8f6d06fd75 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FullTextSearchTest.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import scala.collection.JavaConverters._ + +/** Tests for full-text search read/write operations using test-only brute-force full-text index. */ +class FullTextSearchTest extends PaimonSparkTestBase { + + private val indexType = "test-fulltext" + + // ========== Index Creation Tests ========== + + test("create full-text index - basic") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, content STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 100) + .map(i => s"($i, 'document number $i about paimon lake format')") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'content', index_type => '$indexType')") + .collect() + .head + assert(output.getBoolean(0)) + + val table = loadTable("T") + val indexEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == indexType) + + assert(indexEntries.nonEmpty) + val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum + assert(totalRowCount == 100L) + } + } + + // ========== Index Read/Search Tests ========== + + test("full-text search - basic search") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, content STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 100) + .map(i => s"($i, 'document number $i about paimon lake format')") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'content', index_type => '$indexType')") + .collect() + + val result = spark + .sql(""" + |SELECT * FROM full_text_search('T', 'content', 'paimon', 5) + |""".stripMargin) + .collect() + assert(result.length == 5) + } + } + + test("full-text search - top-k with different k values") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, content STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 200) + .map(i => s"($i, 'document number $i about paimon lake format')") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'content', index_type => '$indexType')") + .collect() + + // Test with k=1 + var result = spark + .sql(""" + |SELECT * FROM full_text_search('T', 'content', 'paimon', 1) + |""".stripMargin) + .collect() + assert(result.length == 1) + + // Test with k=10 + result = spark + .sql(""" + |SELECT * FROM full_text_search('T', 'content', 'paimon', 10) + |""".stripMargin) + .collect() + assert(result.length == 10) + } + } + + test("full-text search - multi-term query") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, content STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + spark.sql(""" + |INSERT INTO T VALUES + | (0, 'Apache Paimon lake format'), + | (1, 'Paimon supports full-text search'), + | (2, 'full-text search in Apache Paimon'), + | (3, 'vector similarity search'), + | (4, 'streaming batch processing') + |""".stripMargin) + + spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'content', index_type => '$indexType')") + .collect() + + // Query "Paimon search" - rows 1 and 2 match both terms + val result = spark + .sql(""" + |SELECT id FROM full_text_search('T', 'content', 'Paimon search', 2) + |ORDER BY id + |""".stripMargin) + .collect() + + assert(result.length == 2) + val ids = result.map(_.getInt(0)).toSet + assert(ids.contains(1)) + assert(ids.contains(2)) + } + } + + // ========== Integration Tests ========== + + test("end-to-end: write, index, search cycle") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, title STRING, content STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 1000) + .map(i => s"($i, 'title_$i', 'document number $i about paimon lake format')") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val indexResult = spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'content', index_type => '$indexType')") + .collect() + .head + assert(indexResult.getBoolean(0)) + + val table = loadTable("T") + val indexEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == indexType) + assert(indexEntries.nonEmpty) + + val searchResult = spark + .sql(""" + |SELECT id, title FROM full_text_search('T', 'content', 'paimon', 10) + |""".stripMargin) + .collect() + + assert(searchResult.length == 10) + } + } +}
