Hello! Can you please frame it as Github pull request as per our process? Do you have ticket for that?
Regards, -- Ilya Kasnacheev пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com>: > > modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java > ```java > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package org.apache.ignite.internal.processors.query.h2.opt; > > import java.io.IOException; > import java.util.Collection; > import java.util.concurrent.atomic.AtomicLong; > import org.apache.ignite.IgniteCheckedException; > import org.apache.ignite.internal.GridKernalContext; > import org.apache.ignite.internal.processors.cache.CacheObject; > import org.apache.ignite.internal.processors.cache.CacheObjectContext; > import > org.apache.ignite.internal.processors.cache.version.GridCacheVersion; > import > org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; > import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; > import org.apache.ignite.internal.util.GridAtomicLong; > import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; > import org.apache.ignite.internal.util.lang.GridCloseableIterator; > import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; > import org.apache.ignite.internal.util.typedef.internal.U; > import org.apache.ignite.lang.IgniteBiTuple; > import org.apache.ignite.spi.indexing.IndexingQueryFilter; > import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; > import org.apache.lucene.analysis.standard.StandardAnalyzer; > import org.apache.lucene.document.Document; > import org.apache.lucene.document.Field; > import org.apache.lucene.document.LongField; > import org.apache.lucene.document.StoredField; > import org.apache.lucene.document.StringField; > import org.apache.lucene.document.TextField; > import org.apache.lucene.index.DirectoryReader; > import org.apache.lucene.index.IndexReader; > import org.apache.lucene.index.IndexWriter; > import org.apache.lucene.index.IndexWriterConfig; > import org.apache.lucene.index.Term; > import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; > import org.apache.lucene.search.BooleanClause; > import org.apache.lucene.search.BooleanQuery; > import org.apache.lucene.search.IndexSearcher; > import org.apache.lucene.search.NumericRangeQuery; > import org.apache.lucene.search.Query; > import org.apache.lucene.search.ScoreDoc; > import org.apache.lucene.search.TopDocs; > import org.apache.lucene.util.BytesRef; > import org.h2.util.JdbcUtils; > import org.jetbrains.annotations.Nullable; > > import static > org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; > import static > org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; > > /** > * Lucene fulltext index. > */ > public class GridLuceneIndex implements AutoCloseable { > /** Field name for string representation of value. */ > public static final String VAL_STR_FIELD_NAME = "_gg_val_str__"; > > /** Field name for value version. */ > public static final String VER_FIELD_NAME = "_gg_ver__"; > > /** Field name for value expiration time. */ > public static final String EXPIRATION_TIME_FIELD_NAME = > "_gg_expires__"; > > /** */ > private final String cacheName; > > /** */ > private final GridQueryTypeDescriptor type; > > /** */ > private final IndexWriter writer; > > /** */ > private final String[] idxdFields; > > /** */ > private final AtomicLong updateCntr = new GridAtomicLong(); > > /** */ > private final GridLuceneDirectory dir; > > /** */ > private final GridKernalContext ctx; > > /** > * Constructor. > * > * @param ctx Kernal context. > * @param cacheName Cache name. > * @param type Type descriptor. > * @throws IgniteCheckedException If failed. > */ > public GridLuceneIndex(GridKernalContext ctx, @Nullable String > cacheName, GridQueryTypeDescriptor type) > throws IgniteCheckedException { > this.ctx = ctx; > this.cacheName = cacheName; > this.type = type; > > dir = new GridLuceneDirectory(new GridUnsafeMemory(0)); > > try { > writer = new IndexWriter(dir, new IndexWriterConfig(new > StandardAnalyzer())); > } > catch (IOException e) { > throw new IgniteCheckedException(e); > } > > GridQueryIndexDescriptor idx = type.textIndex(); > > if (idx != null) { > Collection<String> fields = idx.fields(); > > idxdFields = new String[fields.size() + 1]; > > fields.toArray(idxdFields); > } > else { > assert type.valueTextIndex() || type.valueClass() == > String.class; > > idxdFields = new String[1]; > } > > idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME; > } > > /** > * @return Cache object context. > */ > private CacheObjectContext objectContext() { > if (ctx == null) > return null; > > return > ctx.cache().internalCache(cacheName).context().cacheObjectContext(); > } > > /** > * Stores given data in this fulltext index. > * > * @param k Key. > * @param v Value. > * @param ver Version. > * @param expires Expiration time. > * @throws IgniteCheckedException If failed. > */ > @SuppressWarnings("ConstantConditions") > public void store(CacheObject k, CacheObject v, GridCacheVersion ver, > long expires) throws IgniteCheckedException { > CacheObjectContext coctx = objectContext(); > > Object key = k.isPlatformType() ? k.value(coctx, false) : k; > Object val = v.isPlatformType() ? v.value(coctx, false) : v; > > Document doc = new Document(); > > boolean stringsFound = false; > > if (type.valueTextIndex() || type.valueClass() == String.class) { > doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(), > Field.Store.YES)); > > stringsFound = true; > } > > for (int i = 0, last = idxdFields.length - 1; i < last; i++) { > Object fieldVal = type.value(idxdFields[i], key, val); > > if (fieldVal != null) { > doc.add(new TextField(idxdFields[i], fieldVal.toString(), > Field.Store.YES)); > > stringsFound = true; > } > } > > BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx)); > > try { > final Term term = new Term(KEY_FIELD_NAME, keyByteRef); > > if (!stringsFound) { > writer.deleteDocuments(term); > > return; // We did not find any strings to be indexed, will > not store data at all. > } > > doc.add(new StringField(KEY_FIELD_NAME, keyByteRef, > Field.Store.YES)); > > if (type.valueClass() != String.class) > doc.add(new StoredField(VAL_FIELD_NAME, > v.valueBytes(coctx))); > > doc.add(new StoredField(VER_FIELD_NAME, > ver.toString().getBytes())); > > doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires, > Field.Store.YES)); > > // Next implies remove than add atomically operation. > writer.updateDocument(term, doc); > } > catch (IOException e) { > throw new IgniteCheckedException(e); > } > finally { > updateCntr.incrementAndGet(); > } > } > > /** > * Removes entry for given key from this index. > * > * @param key Key. > * @throws IgniteCheckedException If failed. > */ > public void remove(CacheObject key) throws IgniteCheckedException { > try { > writer.deleteDocuments(new Term(KEY_FIELD_NAME, > new BytesRef(key.valueBytes(objectContext())))); > } > catch (IOException e) { > throw new IgniteCheckedException(e); > } > finally { > updateCntr.incrementAndGet(); > } > } > > /** > * Runs lucene fulltext query over this index. > * > * @param qry Query. > * @param filters Filters over result. > * @param pageSize Size of batch > * @return Query result. > * @throws IgniteCheckedException If failed. > */ > public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String > qry, IndexingQueryFilter filters, int pageSize) throws > IgniteCheckedException { > IndexReader reader; > > try { > long updates = updateCntr.get(); > > if (updates != 0) { > writer.commit(); > > updateCntr.addAndGet(-updates); > } > > //We can cache reader\searcher and change this to > 'openIfChanged' > reader = DirectoryReader.open(writer, true); > } > catch (IOException e) { > throw new IgniteCheckedException(e); > } > > IndexSearcher searcher; > > Query query; > > try { > searcher = new IndexSearcher(reader); > > MultiFieldQueryParser parser = new > MultiFieldQueryParser(idxdFields, > writer.getAnalyzer()); > > // parser.setAllowLeadingWildcard(true); > > // Filter expired items. > Query filter = > NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME, > U.currentTimeMillis(), > null, false, false); > > query = new BooleanQuery.Builder() > .add(parser.parse(qry), BooleanClause.Occur.MUST) > .add(filter, BooleanClause.Occur.FILTER) > .build(); > } > catch (Exception e) { > U.closeQuiet(reader); > > throw new IgniteCheckedException(e); > } > > IndexingQueryCacheFilter fltr = null; > > if (filters != null) > fltr = filters.forCache(cacheName); > > return new It<>(reader, searcher, query, fltr, pageSize); > } > > /** {@inheritDoc} */ > @Override public void close() { > U.closeQuiet(writer); > U.close(dir, ctx.log(GridLuceneIndex.class)); > } > > /** > * Key-value iterator over fulltext search result. > */ > private class It<K, V> extends > GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> { > private final int BatchPosBeforeHead = -1; > > /** */ > private static final long serialVersionUID = 0L; > > /** */ > private final int pageSize; > > /** */ > private final IndexReader reader; > > /** */ > private final Query query; > > /** */ > private final IndexSearcher searcher; > > /** current batch docs*/ > private ScoreDoc[] batch; > > /** current position in batch*/ > private int batchPos = BatchPosBeforeHead; > > /** */ > private final IndexingQueryCacheFilter filters; > > /** */ > private IgniteBiTuple<K, V> curr; > > /** */ > private CacheObjectContext coctx; > > /** > * Constructor. > * > * @param reader Reader. > * @param searcher Searcher. > * @param filters Filters over result. > * @throws IgniteCheckedException if failed. > */ > private It(IndexReader reader, IndexSearcher searcher, Query query, > IndexingQueryCacheFilter filters, int pageSize) > throws IgniteCheckedException { > this.reader = reader; > this.searcher = searcher; > this.filters = filters; > this.query = query; > this.pageSize = pageSize; > > coctx = objectContext(); > > findNext(); > } > > /** > * @param bytes Bytes. > * @param ldr Class loader. > * @return Object. > * @throws IgniteCheckedException If failed. > */ > @SuppressWarnings("unchecked") > private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws > IgniteCheckedException { > if (coctx == null) // For tests. > return (Z)JdbcUtils.deserialize(bytes, null); > > return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx, > bytes, ldr); > } > > /** > * Finds next element. > * > * @throws IgniteCheckedException If failed. > */ > @SuppressWarnings("unchecked") > private void findNext() throws IgniteCheckedException { > curr = null; > > if(isClosed()) > throw new IgniteCheckedException("Iterator already > closed"); > > if (shouldRequestNextBatch()) { > try { > requestNextBatch(); > } catch (IOException e) { > close(); > throw new IgniteCheckedException(e); > } > } > > if(batch == null) > return; > > while (batchPos < batch.length) { > Document doc; > ScoreDoc scoreDoc =batch[batchPos++]; > > try { > doc = searcher.doc(scoreDoc.doc); > } > catch (IOException e) { > throw new IgniteCheckedException(e); > } > > ClassLoader ldr = null; > > if (ctx != null && ctx.deploy().enabled()) > ldr = > ctx.cache().internalCache(cacheName).context().deploy().globalLoader(); > > K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes, > ldr); > > if (filters != null && !filters.apply(k)) > continue; > > V v = type.valueClass() == String.class ? > (V)doc.get(VAL_STR_FIELD_NAME) : > > this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr); > > assert v != null; > > curr = new IgniteBiTuple<>(k, v); > > break; > } > } > > private boolean shouldRequestNextBatch() { > if(batch == null){ > // should request for first batch > return (batchPos == BatchPosBeforeHead) ; > } else { > // should request when reached to the end of batch > return (batchPos == batch.length); > } > } > > private void requestNextBatch() throws IOException { > TopDocs docs; > > if (batch == null) { > docs = searcher.search(query, pageSize); > } else { > docs = searcher.searchAfter(batch[batch.length - 1], query, > pageSize); > } > > if(docs.scoreDocs.length ==0) { > batch = null; > }else{ > batch = docs.scoreDocs; > } > > batchPos = 0; > } > > /** {@inheritDoc} */ > @Override protected IgniteBiTuple<K, V> onNext() throws > IgniteCheckedException { > IgniteBiTuple<K, V> res = curr; > > findNext(); > > return res; > } > > /** {@inheritDoc} */ > @Override protected boolean onHasNext() throws > IgniteCheckedException { > return curr != null; > } > > /** {@inheritDoc} */ > @Override protected void onClose() throws IgniteCheckedException { > U.closeQuiet(reader); > } > } > } > ``` > > On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com > > > wrote: > > > Hi, > > > > I tried to implement iterator for GridLuceneInde, could you please help > to > > review ? > > > > -- > > Thanks & Best Regards > > > > Tam, Nguyen Manh > > > > > > -- > Thanks & Best Regards > > Tam, Nguyen Manh >