Hi There

Our app spends alot of time waiting for Lucene to finish writing to the index. I'd like to minimize this. If you have a moment to spare, please let me know if my LuceneIndex class presented below can be improved upon.

It is used in the following way:

luceneIndex = new LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(), exitReq,volume.getID()+" indexer",volume.getIndexPath(), Config.getConfig().getIndex().getMaxSimultaneousDocs());
Document doc = new Document();
IndexInfo indexInfo = new IndexInfo(doc);
luceneIndex.indexDocument(indexInfo);

As an aside note, is there any way for Lucene to support simultaneous writes to an index? For example, each write threads could write to a separate shard, after a period the shared could be merged into a single index? Or is this overkill? I am interested hear the opinion of the Lucene experts.

Thanks in advance

Jamie

package com.stimulus.archiva.index;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class LuceneIndex extends Thread {
protected ArrayBlockingQueue<LuceneDocument> queue; protected static final Log logger = LogFactory.getLog(LuceneIndex.class.getName()); protected static final Log indexLog = LogFactory.getLog("indexlog");
           IndexWriter writer = null;
           protected static ScheduledExecutorService scheduler;
        protected static ScheduledFuture<?> scheduledTask;
        protected LuceneDocument EXIT_REQ = null;
        ReentrantLock indexLock = new ReentrantLock();
        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
        File indexLogFile;
        PrintStream indexLogOut;
        IndexProcessor indexProcessor;
        String friendlyName;
        String indexPath;
        int maxSimultaneousDocs;
public LuceneIndex(int queueSize, LuceneDocument exitReq, String friendlyName, String indexPath, int maxSimultaneousDocs) { this.queue = new ArrayBlockingQueue<LuceneDocument>(queueSize);
               this.EXIT_REQ = exitReq;
               this.friendlyName = friendlyName;
               this.indexPath = indexPath;
               this.maxSimultaneousDocs = maxSimultaneousDocs;
               setLog(friendlyName);
           }
public int getMaxSimultaneousDocs() {
             return maxSimultaneousDocs;
         }
public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
             this.maxSimultaneousDocs = maxSimultaneousDocs;
         }
public ReentrantLock getIndexLock() {
             return indexLock;
         }
protected void setLog(String logName) {

               try {
                   indexLogFile = getIndexLogFile(logName);
                   if (indexLogFile!=null) {
                       if (indexLogFile.length()>10485760)
                           indexLogFile.delete();
                       indexLogOut = new PrintStream(indexLogFile);
                   }
logger.debug("set index log file path {path='"+indexLogFile.getCanonicalPath()+"'}");
               } catch (Exception e) {
logger.error("failed to open index log file:"+e.getMessage(),e);
               }
         }
protected File getIndexLogFile(String logName) {
              try {
String logfilepath = Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
                   return new File(logfilepath);
               } catch (Exception e) {
logger.error("failed to open index log file:"+e.getMessage(),e);
                   return null;
               }
         }
protected void openIndex() throws MessageSearchException {
           Exception lastError = null;
if (writer==null) { logger.debug("openIndex() index "+friendlyName+" will be opened. it is currently closed.");
           } else {
logger.debug("openIndex() did not bother opening index "+friendlyName+". it is already open.");
               return;
           }
           logger.debug("opening index "+friendlyName+" for write");
logger.debug("opening search index "+friendlyName+" for write {indexpath='"+indexPath+"'}");
           boolean writelock;
           int attempt = 0;
           int maxattempt = 10;
if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
               maxattempt = 10000;
            } else {
               maxattempt = 10;
            }
do {
               writelock = false;
               try {
FSDirectory fsDirectory = FSDirectory.getDirectory(indexPath); int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(fsDirectory,analyzer,new IndexWriter.MaxFieldLength(maxIndexChars)); if (indexLog.isDebugEnabled() && indexLogOut!=null) {
                           writer.setInfoStream(indexLogOut);
                       }
               } catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index "+friendlyName+". will reopen in 50ms.");
                       try { Thread.sleep(50); } catch (Exception e) {}
                       attempt++;
                       writelock = true;
               } catch (CorruptIndexException cie) {
throw new MessageSearchException("index "+friendlyName+" appears to be corrupt. please reindex the active volume."+cie.getMessage(),logger);
               } catch (Throwable io) {
throw new MessageSearchException("failed to write document to index "+friendlyName+":"+io.getMessage(),logger);
               }
          } while (writelock && attempt<maxattempt);
          if (attempt>=10000)
throw new MessageSearchException("failed to open index "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
       }
public void indexDocument(LuceneDocument luceneDocument) throws MessageSearchException {
           logger.debug("index document {"+luceneDocument+"}");
           long s = (new Date()).getTime();
           if (luceneDocument == null)
throw new MessageSearchException("assertion failure: null document",logger);
           try {
               queue.put(luceneDocument);
           } catch (InterruptedException ie) {
throw new MessageSearchException("failed to add document to queue:"+ie.getMessage(),ie,logger);
           }
logger.debug("document indexed successfully {"+luceneDocument+"}"); logger.debug("indexing message end {"+luceneDocument+"}");
           long e = (new Date()).getTime();
           logger.debug("indexing time {time='"+(e-s)+"'}");
       }
public class IndexProcessor extends Thread { public IndexProcessor() {
               setName("index processor");
           }
public void run() {
               boolean exit = false;
               //ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any major performance benefit LuceneDocument luceneDocument = null; LinkedList<LuceneDocument> pushbacks = new LinkedList<LuceneDocument>(); while (!exit) { try { //documentPool = Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); luceneDocument = null; luceneDocument = (LuceneDocument) queue.take(); indexLock.lock(); if (luceneDocument==EXIT_REQ) { logger.debug("index exit req received. exiting");
                           exit = true;
                           continue;
                       }
try {
                            openIndex();
                       } catch (Exception e) {
logger.error("failed to open index:"+e.getMessage(),e);
                            return;
                       }
                       if (luceneDocument==null) {
                           logger.debug("index info is null");
                       }
                       int i = 0;
while(luceneDocument!=null && i<maxSimultaneousDocs) {
                           try {
                               Document doc = luceneDocument.getDocument();
                               String language = doc.get("lang");
                               if (language==null) {
language = Config.getConfig().getIndex().getIndexLanguage();
                               }
writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
                           } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                           } catch (AlreadyClosedException e) {
                               pushbacks.add(luceneDocument);
                               break;
                           }
//documentPool.execute(new IndexDocument(luceneDocument,pushbacks)); i++;
                            if (i<maxSimultaneousDocs) {
luceneDocument = (LuceneDocument) queue.poll(); if (luceneDocument==null) {
                                       logger.debug("index info is null");
                                }
if (luceneDocument==EXIT_REQ) { logger.debug("index exit req received. exiting (2)");
                                       exit = true;
                                       break;
                                 }
                            }
}
                       if (pushbacks.size()>0) {
                             closeIndex();
                             try {
                                    openIndex();
                             } catch (Exception e) {
logger.error("failed to open index:"+e.getMessage(),e);
                                return;
                             }
                             for (LuceneDocument pushback : pushbacks) {
                                   try {
writer.addDocument(pushback.getDocument());
                                   } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                                   } catch (AlreadyClosedException e) {
                                       pushbacks.add(pushback);
                                   }
//documentPool.execute(new IndexDocument(pushback,pushbacks));
                                   i++;
                             }
                       }
//documentPool.shutdown(); //documentPool.awaitTermination(30,TimeUnit.MINUTES); } catch (Throwable ie) { logger.error("index write interrupted:"+ie.getMessage());
                    } finally {
                          closeIndex();
                         indexLock.unlock();
                   }
} } public class IndexDocument extends Thread { LuceneDocument luceneDocument = null;
                   List<LuceneDocument> pushbacks = null;
public IndexDocument(LuceneDocument luceneDocument,List<LuceneDocument> pushbacks) {
                       this.luceneDocument = luceneDocument;
                       this.pushbacks = pushbacks;
                       setName("index document");
                   }
public void run() {
                       try {
writer.addDocument(luceneDocument.getDocument());
                       } catch (IOException io) {
logger.error("failed to add document to index:"+io.getMessage(),io);
                       } catch (AlreadyClosedException e) {
                           pushbacks.add(luceneDocument);
                       } catch (Throwable t) {
logger.error("failed to add document to index:"+t.getMessage(),t);
                       }
                   }};
           }
protected void closeIndex() {
            try {
                indexLock.lock();
                if (writer!=null) {
                   writer.close();
                }
            } catch (Throwable io) {
logger.error("failed to close index writer:"+io.getMessage(),io);
            } finally {
                logger.debug("writer closed");
                writer = null;
                indexLock.unlock();
            }
       }
public void deleteIndex() throws MessageSearchException {
                  logger.debug("delete index {indexpath='"+indexPath+"'}");
                  try {
                     indexLock.lock();
                    try {
int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new IndexWriter.MaxFieldLength(maxIndexChars));
                    } catch (Throwable cie) {
logger.error("failed to delete index {index='"+indexPath+"'}",cie);
                        return;
                    }
                    MessageIndex.volumeIndexes.remove(this);
                 } finally {
                       closeIndex();
                     indexLock.unlock();
               }
         }
public void startup() {
           logger.debug("volumeindex is starting up");
File lockFile = new File(indexPath+File.separatorChar + "write.lock");
           if (lockFile.exists()) {
logger.warn("The server lock file already exists. Either another indexer is running or the server was not shutdown correctly."); logger.warn("If it is the latter, the lock file must be manually deleted at "+lockFile.getAbsolutePath()); if (Config.getConfig().getIndex().getMultipleIndexProcesses()) { logger.debug("index lock file detected on volumeindex startup.");
               } else {
logger.warn("index lock file detected. the server was shutdown incorrectly. automatically deleting lock file."); logger.warn("indexer is configured to deal with only one indexer process."); logger.warn("if you are running more than one indexer, your index could be subject to corruption.");
                   lockFile.delete();
               }
           }
           indexProcessor = new IndexProcessor();
           indexProcessor.start();
           Runtime.getRuntime().addShutdownHook(this);
} public void shutdown() {
             logger.debug("volumeindex is shutting down");
             queue.add(EXIT_REQ);
             scheduler.shutdownNow();
} @Override
         public void run() {
             queue.add(EXIT_REQ);
         }
public interface LuceneDocument { public String toString();
             public Document getDocument();
             public void finalize();
} }




---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org

Reply via email to