modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java ```java /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.ignite.internal.processors.query.h2.opt; import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.NumericRangeQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.util.BytesRef; import org.h2.util.JdbcUtils; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; /** * Lucene fulltext index. */ public class GridLuceneIndex implements AutoCloseable { /** Field name for string representation of value. */ public static final String VAL_STR_FIELD_NAME = "_gg_val_str__"; /** Field name for value version. */ public static final String VER_FIELD_NAME = "_gg_ver__"; /** Field name for value expiration time. */ public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__"; /** */ private final String cacheName; /** */ private final GridQueryTypeDescriptor type; /** */ private final IndexWriter writer; /** */ private final String[] idxdFields; /** */ private final AtomicLong updateCntr = new GridAtomicLong(); /** */ private final GridLuceneDirectory dir; /** */ private final GridKernalContext ctx; /** * Constructor. * * @param ctx Kernal context. * @param cacheName Cache name. * @param type Type descriptor. * @throws IgniteCheckedException If failed. */ public GridLuceneIndex(GridKernalContext ctx, @Nullable String cacheName, GridQueryTypeDescriptor type) throws IgniteCheckedException { this.ctx = ctx; this.cacheName = cacheName; this.type = type; dir = new GridLuceneDirectory(new GridUnsafeMemory(0)); try { writer = new IndexWriter(dir, new IndexWriterConfig(new StandardAnalyzer())); } catch (IOException e) { throw new IgniteCheckedException(e); } GridQueryIndexDescriptor idx = type.textIndex(); if (idx != null) { Collection<String> fields = idx.fields(); idxdFields = new String[fields.size() + 1]; fields.toArray(idxdFields); } else { assert type.valueTextIndex() || type.valueClass() == String.class; idxdFields = new String[1]; } idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME; } /** * @return Cache object context. */ private CacheObjectContext objectContext() { if (ctx == null) return null; return ctx.cache().internalCache(cacheName).context().cacheObjectContext(); } /** * Stores given data in this fulltext index. * * @param k Key. * @param v Value. * @param ver Version. * @param expires Expiration time. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("ConstantConditions") public void store(CacheObject k, CacheObject v, GridCacheVersion ver, long expires) throws IgniteCheckedException { CacheObjectContext coctx = objectContext(); Object key = k.isPlatformType() ? k.value(coctx, false) : k; Object val = v.isPlatformType() ? v.value(coctx, false) : v; Document doc = new Document(); boolean stringsFound = false; if (type.valueTextIndex() || type.valueClass() == String.class) { doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(), Field.Store.YES)); stringsFound = true; } for (int i = 0, last = idxdFields.length - 1; i < last; i++) { Object fieldVal = type.value(idxdFields[i], key, val); if (fieldVal != null) { doc.add(new TextField(idxdFields[i], fieldVal.toString(), Field.Store.YES)); stringsFound = true; } } BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx)); try { final Term term = new Term(KEY_FIELD_NAME, keyByteRef); if (!stringsFound) { writer.deleteDocuments(term); return; // We did not find any strings to be indexed, will not store data at all. } doc.add(new StringField(KEY_FIELD_NAME, keyByteRef, Field.Store.YES)); if (type.valueClass() != String.class) doc.add(new StoredField(VAL_FIELD_NAME, v.valueBytes(coctx))); doc.add(new StoredField(VER_FIELD_NAME, ver.toString().getBytes())); doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires, Field.Store.YES)); // Next implies remove than add atomically operation. writer.updateDocument(term, doc); } catch (IOException e) { throw new IgniteCheckedException(e); } finally { updateCntr.incrementAndGet(); } } /** * Removes entry for given key from this index. * * @param key Key. * @throws IgniteCheckedException If failed. */ public void remove(CacheObject key) throws IgniteCheckedException { try { writer.deleteDocuments(new Term(KEY_FIELD_NAME, new BytesRef(key.valueBytes(objectContext())))); } catch (IOException e) { throw new IgniteCheckedException(e); } finally { updateCntr.incrementAndGet(); } } /** * Runs lucene fulltext query over this index. * * @param qry Query. * @param filters Filters over result. * @param pageSize Size of batch * @return Query result. * @throws IgniteCheckedException If failed. */ public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String qry, IndexingQueryFilter filters, int pageSize) throws IgniteCheckedException { IndexReader reader; try { long updates = updateCntr.get(); if (updates != 0) { writer.commit(); updateCntr.addAndGet(-updates); } //We can cache reader\searcher and change this to 'openIfChanged' reader = DirectoryReader.open(writer, true); } catch (IOException e) { throw new IgniteCheckedException(e); } IndexSearcher searcher; Query query; try { searcher = new IndexSearcher(reader); MultiFieldQueryParser parser = new MultiFieldQueryParser(idxdFields, writer.getAnalyzer()); // parser.setAllowLeadingWildcard(true); // Filter expired items. Query filter = NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME, U.currentTimeMillis(), null, false, false); query = new BooleanQuery.Builder() .add(parser.parse(qry), BooleanClause.Occur.MUST) .add(filter, BooleanClause.Occur.FILTER) .build(); } catch (Exception e) { U.closeQuiet(reader); throw new IgniteCheckedException(e); } IndexingQueryCacheFilter fltr = null; if (filters != null) fltr = filters.forCache(cacheName); return new It<>(reader, searcher, query, fltr, pageSize); } /** {@inheritDoc} */ @Override public void close() { U.closeQuiet(writer); U.close(dir, ctx.log(GridLuceneIndex.class)); } /** * Key-value iterator over fulltext search result. */ private class It<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> { private final int BatchPosBeforeHead = -1; /** */ private static final long serialVersionUID = 0L; /** */ private final int pageSize; /** */ private final IndexReader reader; /** */ private final Query query; /** */ private final IndexSearcher searcher; /** current batch docs*/ private ScoreDoc[] batch; /** current position in batch*/ private int batchPos = BatchPosBeforeHead; /** */ private final IndexingQueryCacheFilter filters; /** */ private IgniteBiTuple<K, V> curr; /** */ private CacheObjectContext coctx; /** * Constructor. * * @param reader Reader. * @param searcher Searcher. * @param filters Filters over result. * @throws IgniteCheckedException if failed. */ private It(IndexReader reader, IndexSearcher searcher, Query query, IndexingQueryCacheFilter filters, int pageSize) throws IgniteCheckedException { this.reader = reader; this.searcher = searcher; this.filters = filters; this.query = query; this.pageSize = pageSize; coctx = objectContext(); findNext(); } /** * @param bytes Bytes. * @param ldr Class loader. * @return Object. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException { if (coctx == null) // For tests. return (Z)JdbcUtils.deserialize(bytes, null); return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx, bytes, ldr); } /** * Finds next element. * * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private void findNext() throws IgniteCheckedException { curr = null; if(isClosed()) throw new IgniteCheckedException("Iterator already closed"); if (shouldRequestNextBatch()) { try { requestNextBatch(); } catch (IOException e) { close(); throw new IgniteCheckedException(e); } } if(batch == null) return; while (batchPos < batch.length) { Document doc; ScoreDoc scoreDoc =batch[batchPos++]; try { doc = searcher.doc(scoreDoc.doc); } catch (IOException e) { throw new IgniteCheckedException(e); } ClassLoader ldr = null; if (ctx != null && ctx.deploy().enabled()) ldr = ctx.cache().internalCache(cacheName).context().deploy().globalLoader(); K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes, ldr); if (filters != null && !filters.apply(k)) continue; V v = type.valueClass() == String.class ? (V)doc.get(VAL_STR_FIELD_NAME) : this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr); assert v != null; curr = new IgniteBiTuple<>(k, v); break; } } private boolean shouldRequestNextBatch() { if(batch == null){ // should request for first batch return (batchPos == BatchPosBeforeHead) ; } else { // should request when reached to the end of batch return (batchPos == batch.length); } } private void requestNextBatch() throws IOException { TopDocs docs; if (batch == null) { docs = searcher.search(query, pageSize); } else { docs = searcher.searchAfter(batch[batch.length - 1], query, pageSize); } if(docs.scoreDocs.length ==0) { batch = null; }else{ batch = docs.scoreDocs; } batchPos = 0; } /** {@inheritDoc} */ @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException { IgniteBiTuple<K, V> res = curr; findNext(); return res; } /** {@inheritDoc} */ @Override protected boolean onHasNext() throws IgniteCheckedException { return curr != null; } /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { U.closeQuiet(reader); } } } ``` On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com> wrote: > Hi, > > I tried to implement iterator for GridLuceneInde, could you please help to > review ? > > -- > Thanks & Best Regards > > Tam, Nguyen Manh > > -- Thanks & Best Regards Tam, Nguyen Manh