Hello!

Can you please frame it as Github pull request as per our process? Do you
have ticket for that?

Regards,
-- 
Ilya Kasnacheev


пт, 7 сент. 2018 г. в 5:08, Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com>:

>
> modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
> ```java
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.ignite.internal.processors.query.h2.opt;
>
> import java.io.IOException;
> import java.util.Collection;
> import java.util.concurrent.atomic.AtomicLong;
> import org.apache.ignite.IgniteCheckedException;
> import org.apache.ignite.internal.GridKernalContext;
> import org.apache.ignite.internal.processors.cache.CacheObject;
> import org.apache.ignite.internal.processors.cache.CacheObjectContext;
> import
> org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
> import
> org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
> import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
> import org.apache.ignite.internal.util.GridAtomicLong;
> import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
> import org.apache.ignite.internal.util.lang.GridCloseableIterator;
> import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.lang.IgniteBiTuple;
> import org.apache.ignite.spi.indexing.IndexingQueryFilter;
> import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
> import org.apache.lucene.analysis.standard.StandardAnalyzer;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.document.Field;
> import org.apache.lucene.document.LongField;
> import org.apache.lucene.document.StoredField;
> import org.apache.lucene.document.StringField;
> import org.apache.lucene.document.TextField;
> import org.apache.lucene.index.DirectoryReader;
> import org.apache.lucene.index.IndexReader;
> import org.apache.lucene.index.IndexWriter;
> import org.apache.lucene.index.IndexWriterConfig;
> import org.apache.lucene.index.Term;
> import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
> import org.apache.lucene.search.BooleanClause;
> import org.apache.lucene.search.BooleanQuery;
> import org.apache.lucene.search.IndexSearcher;
> import org.apache.lucene.search.NumericRangeQuery;
> import org.apache.lucene.search.Query;
> import org.apache.lucene.search.ScoreDoc;
> import org.apache.lucene.search.TopDocs;
> import org.apache.lucene.util.BytesRef;
> import org.h2.util.JdbcUtils;
> import org.jetbrains.annotations.Nullable;
>
> import static
> org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
> import static
> org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
>
> /**
>  * Lucene fulltext index.
>  */
> public class GridLuceneIndex implements AutoCloseable {
>     /** Field name for string representation of value. */
>     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
>
>     /** Field name for value version. */
>     public static final String VER_FIELD_NAME = "_gg_ver__";
>
>     /** Field name for value expiration time. */
>     public static final String EXPIRATION_TIME_FIELD_NAME =
> "_gg_expires__";
>
>     /** */
>     private final String cacheName;
>
>     /** */
>     private final GridQueryTypeDescriptor type;
>
>     /** */
>     private final IndexWriter writer;
>
>     /** */
>     private final String[] idxdFields;
>
>     /** */
>     private final AtomicLong updateCntr = new GridAtomicLong();
>
>     /** */
>     private final GridLuceneDirectory dir;
>
>     /** */
>     private final GridKernalContext ctx;
>
>     /**
>      * Constructor.
>      *
>      * @param ctx Kernal context.
>      * @param cacheName Cache name.
>      * @param type Type descriptor.
>      * @throws IgniteCheckedException If failed.
>      */
>     public GridLuceneIndex(GridKernalContext ctx, @Nullable String
> cacheName, GridQueryTypeDescriptor type)
>         throws IgniteCheckedException {
>         this.ctx = ctx;
>         this.cacheName = cacheName;
>         this.type = type;
>
>         dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
>
>         try {
>             writer = new IndexWriter(dir, new IndexWriterConfig(new
> StandardAnalyzer()));
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>
>         GridQueryIndexDescriptor idx = type.textIndex();
>
>         if (idx != null) {
>             Collection<String> fields = idx.fields();
>
>             idxdFields = new String[fields.size() + 1];
>
>             fields.toArray(idxdFields);
>         }
>         else {
>             assert type.valueTextIndex() || type.valueClass() ==
> String.class;
>
>             idxdFields = new String[1];
>         }
>
>         idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
>     }
>
>     /**
>      * @return Cache object context.
>      */
>     private CacheObjectContext objectContext() {
>         if (ctx == null)
>             return null;
>
>         return
> ctx.cache().internalCache(cacheName).context().cacheObjectContext();
>     }
>
>     /**
>      * Stores given data in this fulltext index.
>      *
>      * @param k Key.
>      * @param v Value.
>      * @param ver Version.
>      * @param expires Expiration time.
>      * @throws IgniteCheckedException If failed.
>      */
>     @SuppressWarnings("ConstantConditions")
>     public void store(CacheObject k, CacheObject v, GridCacheVersion ver,
> long expires) throws IgniteCheckedException {
>         CacheObjectContext coctx = objectContext();
>
>         Object key = k.isPlatformType() ? k.value(coctx, false) : k;
>         Object val = v.isPlatformType() ? v.value(coctx, false) : v;
>
>         Document doc = new Document();
>
>         boolean stringsFound = false;
>
>         if (type.valueTextIndex() || type.valueClass() == String.class) {
>             doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
> Field.Store.YES));
>
>             stringsFound = true;
>         }
>
>         for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
>             Object fieldVal = type.value(idxdFields[i], key, val);
>
>             if (fieldVal != null) {
>                 doc.add(new TextField(idxdFields[i], fieldVal.toString(),
> Field.Store.YES));
>
>                 stringsFound = true;
>             }
>         }
>
>         BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
>
>         try {
>             final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
>
>             if (!stringsFound) {
>                 writer.deleteDocuments(term);
>
>                 return; // We did not find any strings to be indexed, will
> not store data at all.
>             }
>
>             doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
> Field.Store.YES));
>
>             if (type.valueClass() != String.class)
>                 doc.add(new StoredField(VAL_FIELD_NAME,
> v.valueBytes(coctx)));
>
>             doc.add(new StoredField(VER_FIELD_NAME,
> ver.toString().getBytes()));
>
>             doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
> Field.Store.YES));
>
>             // Next implies remove than add atomically operation.
>             writer.updateDocument(term, doc);
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>         finally {
>             updateCntr.incrementAndGet();
>         }
>     }
>
>     /**
>      * Removes entry for given key from this index.
>      *
>      * @param key Key.
>      * @throws IgniteCheckedException If failed.
>      */
>     public void remove(CacheObject key) throws IgniteCheckedException {
>         try {
>             writer.deleteDocuments(new Term(KEY_FIELD_NAME,
>                 new BytesRef(key.valueBytes(objectContext()))));
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>         finally {
>             updateCntr.incrementAndGet();
>         }
>     }
>
>     /**
>      * Runs lucene fulltext query over this index.
>      *
>      * @param qry Query.
>      * @param filters Filters over result.
>      * @param pageSize Size of batch
>      * @return Query result.
>      * @throws IgniteCheckedException If failed.
>      */
>     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String
> qry, IndexingQueryFilter filters, int pageSize) throws
> IgniteCheckedException {
>         IndexReader reader;
>
>         try {
>             long updates = updateCntr.get();
>
>             if (updates != 0) {
>                 writer.commit();
>
>                 updateCntr.addAndGet(-updates);
>             }
>
>             //We can cache reader\searcher and change this to
> 'openIfChanged'
>             reader = DirectoryReader.open(writer, true);
>         }
>         catch (IOException e) {
>             throw new IgniteCheckedException(e);
>         }
>
>         IndexSearcher searcher;
>
>         Query query;
>
>         try {
>             searcher = new IndexSearcher(reader);
>
>             MultiFieldQueryParser parser = new
> MultiFieldQueryParser(idxdFields,
>                 writer.getAnalyzer());
>
> //            parser.setAllowLeadingWildcard(true);
>
>             // Filter expired items.
>             Query filter =
> NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
> U.currentTimeMillis(),
>                 null, false, false);
>
>             query = new BooleanQuery.Builder()
>                 .add(parser.parse(qry), BooleanClause.Occur.MUST)
>                 .add(filter, BooleanClause.Occur.FILTER)
>                 .build();
>         }
>         catch (Exception e) {
>             U.closeQuiet(reader);
>
>             throw new IgniteCheckedException(e);
>         }
>
>         IndexingQueryCacheFilter fltr = null;
>
>         if (filters != null)
>             fltr = filters.forCache(cacheName);
>
>         return new It<>(reader, searcher, query, fltr, pageSize);
>     }
>
>     /** {@inheritDoc} */
>     @Override public void close() {
>         U.closeQuiet(writer);
>         U.close(dir, ctx.log(GridLuceneIndex.class));
>     }
>
>     /**
>      * Key-value iterator over fulltext search result.
>      */
>     private class It<K, V> extends
> GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
>         private final int BatchPosBeforeHead = -1;
>
>         /** */
>         private static final long serialVersionUID = 0L;
>
>         /** */
>         private final int pageSize;
>
>         /** */
>         private final IndexReader reader;
>
>         /** */
>         private final Query query;
>
>         /** */
>         private final IndexSearcher searcher;
>
>         /** current batch docs*/
>         private ScoreDoc[] batch;
>
>         /** current position in batch*/
>         private int batchPos = BatchPosBeforeHead;
>
>         /** */
>         private final IndexingQueryCacheFilter filters;
>
>         /** */
>         private IgniteBiTuple<K, V> curr;
>
>         /** */
>         private CacheObjectContext coctx;
>
>         /**
>          * Constructor.
>          *
>          * @param reader Reader.
>          * @param searcher Searcher.
>          * @param filters Filters over result.
>          * @throws IgniteCheckedException if failed.
>          */
>         private It(IndexReader reader, IndexSearcher searcher, Query query,
> IndexingQueryCacheFilter filters, int pageSize)
>             throws IgniteCheckedException {
>             this.reader = reader;
>             this.searcher = searcher;
>             this.filters = filters;
>             this.query = query;
>             this.pageSize = pageSize;
>
>             coctx = objectContext();
>
>             findNext();
>         }
>
>         /**
>          * @param bytes Bytes.
>          * @param ldr Class loader.
>          * @return Object.
>          * @throws IgniteCheckedException If failed.
>          */
>         @SuppressWarnings("unchecked")
>         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
> IgniteCheckedException {
>             if (coctx == null) // For tests.
>                 return (Z)JdbcUtils.deserialize(bytes, null);
>
>             return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
> bytes, ldr);
>         }
>
>         /**
>          * Finds next element.
>          *
>          * @throws IgniteCheckedException If failed.
>          */
>         @SuppressWarnings("unchecked")
>         private void findNext() throws IgniteCheckedException {
>             curr = null;
>
>             if(isClosed())
>                 throw new IgniteCheckedException("Iterator already
> closed");
>
>             if (shouldRequestNextBatch()) {
>                 try {
>                     requestNextBatch();
>                 } catch (IOException e) {
>                     close();
>                     throw new IgniteCheckedException(e);
>                 }
>             }
>
>             if(batch == null)
>                 return;
>
>             while (batchPos < batch.length) {
>                 Document doc;
>                 ScoreDoc scoreDoc =batch[batchPos++];
>
>                 try {
>                     doc = searcher.doc(scoreDoc.doc);
>                 }
>                 catch (IOException e) {
>                     throw new IgniteCheckedException(e);
>                 }
>
>                 ClassLoader ldr = null;
>
>                 if (ctx != null && ctx.deploy().enabled())
>                     ldr =
> ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
>
>                 K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
> ldr);
>
>                 if (filters != null && !filters.apply(k))
>                     continue;
>
>                 V v = type.valueClass() == String.class ?
>                     (V)doc.get(VAL_STR_FIELD_NAME) :
>
> this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
>
>                 assert v != null;
>
>                 curr = new IgniteBiTuple<>(k, v);
>
>                 break;
>             }
>         }
>
>         private boolean shouldRequestNextBatch()  {
>             if(batch == null){
>                 // should request for first batch
>                 return (batchPos == BatchPosBeforeHead) ;
>             } else {
>                 // should request when reached to the end of batch
>                 return (batchPos  == batch.length);
>             }
>         }
>
>         private void requestNextBatch() throws IOException {
>             TopDocs docs;
>
>             if (batch == null) {
>                 docs = searcher.search(query, pageSize);
>             } else {
>                 docs = searcher.searchAfter(batch[batch.length - 1], query,
> pageSize);
>             }
>
>             if(docs.scoreDocs.length ==0) {
>                 batch = null;
>             }else{
>                 batch = docs.scoreDocs;
>             }
>
>             batchPos = 0;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected IgniteBiTuple<K, V> onNext() throws
> IgniteCheckedException {
>             IgniteBiTuple<K, V> res = curr;
>
>             findNext();
>
>             return res;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected boolean onHasNext() throws
> IgniteCheckedException {
>             return curr != null;
>         }
>
>         /** {@inheritDoc} */
>         @Override protected void onClose() throws IgniteCheckedException {
>             U.closeQuiet(reader);
>         }
>     }
> }
> ```
>
> On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <nguyenmanhtam...@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > I tried to implement iterator for GridLuceneInde, could you please help
> to
> > review ?
> >
> > --
> > Thanks & Best Regards
> >
> > Tam, Nguyen Manh
> >
> >
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>

Reply via email to