You might try re-implementing, using ThreadPoolExecutor http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html
glen 2009/11/10 Jamie Band <ja...@stimulussoft.com>: > Hi There > > Our app spends alot of time waiting for Lucene to finish writing to the > index. I'd like to minimize this. If you have a moment to spare, please let > me know if my LuceneIndex class presented below can be improved upon. > > It is used in the following way: > > luceneIndex = new > LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(), > exitReq,volume.getID()+" > indexer",volume.getIndexPath(), > > Config.getConfig().getIndex().getMaxSimultaneousDocs()); > Document doc = new Document(); > IndexInfo indexInfo = new IndexInfo(doc); > luceneIndex.indexDocument(indexInfo); > > As an aside note, is there any way for Lucene to support simultaneous writes > to an index? For example, each write threads could write to a separate > shard, after a period the shared could be merged into a single index? Or is > this overkill? I am interested hear the opinion of the Lucene experts. > > Thanks in advance > > Jamie > > package com.stimulus.archiva.index; > > import java.io.File; > import java.io.IOException; > import java.io.PrintStream; > import org.apache.commons.logging.*; > import org.apache.lucene.document.Document; > import org.apache.lucene.index.*; > import org.apache.lucene.store.FSDirectory; > import java.util.*; > import org.apache.lucene.store.LockObtainFailedException; > import org.apache.lucene.store.AlreadyClosedException; > import java.util.concurrent.locks.ReentrantLock; > import java.util.concurrent.*; > > public class LuceneIndex extends Thread { > protected ArrayBlockingQueue<LuceneDocument> queue; > protected static final Log logger = > LogFactory.getLog(LuceneIndex.class.getName()); > protected static final Log indexLog = LogFactory.getLog("indexlog"); > IndexWriter writer = null; > protected static ScheduledExecutorService scheduler; > protected static ScheduledFuture<?> scheduledTask; > protected LuceneDocument EXIT_REQ = null; > ReentrantLock indexLock = new ReentrantLock(); > ArchivaAnalyzer analyzer = new ArchivaAnalyzer(); > File indexLogFile; > PrintStream indexLogOut; > IndexProcessor indexProcessor; > String friendlyName; > String indexPath; > int maxSimultaneousDocs; > public LuceneIndex(int queueSize, LuceneDocument exitReq, > String friendlyName, String indexPath, int > maxSimultaneousDocs) { > this.queue = new > ArrayBlockingQueue<LuceneDocument>(queueSize); > this.EXIT_REQ = exitReq; > this.friendlyName = friendlyName; > this.indexPath = indexPath; > this.maxSimultaneousDocs = maxSimultaneousDocs; > setLog(friendlyName); > } > public int getMaxSimultaneousDocs() { > return maxSimultaneousDocs; > } > public void setMaxSimultaneousDocs(int maxSimultaneousDocs) > { > this.maxSimultaneousDocs = maxSimultaneousDocs; > } > public ReentrantLock getIndexLock() { > return indexLock; > } > protected void setLog(String logName) { > > try { > indexLogFile = getIndexLogFile(logName); > if (indexLogFile!=null) { > if (indexLogFile.length()>10485760) > indexLogFile.delete(); > indexLogOut = new PrintStream(indexLogFile); > } > logger.debug("set index log file path > {path='"+indexLogFile.getCanonicalPath()+"'}"); > } catch (Exception e) { > logger.error("failed to open index log > file:"+e.getMessage(),e); > } > } > protected File getIndexLogFile(String logName) { > try { > String logfilepath = > Config.getFileSystem().getLogPath()+File.separator+logName+"index.log"; > return new File(logfilepath); > } catch (Exception e) { > logger.error("failed to open index log > file:"+e.getMessage(),e); > return null; > } > } > protected void openIndex() throws > MessageSearchException { > Exception lastError = null; > if (writer==null) { > logger.debug("openIndex() index "+friendlyName+" will be > opened. it is currently closed."); > } else { > logger.debug("openIndex() did not bother opening index > "+friendlyName+". it is already open."); > return; > } > logger.debug("opening index "+friendlyName+" for write"); > logger.debug("opening search index "+friendlyName+" for write > {indexpath='"+indexPath+"'}"); > boolean writelock; > int attempt = 0; > int maxattempt = 10; > if > (Config.getConfig().getIndex().getMultipleIndexProcesses()) { > maxattempt = 10000; > } else { > maxattempt = 10; > } > do { > writelock = false; > try { > FSDirectory fsDirectory = > FSDirectory.getDirectory(indexPath); > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new IndexWriter(fsDirectory,analyzer,new > IndexWriter.MaxFieldLength(maxIndexChars)); > if (indexLog.isDebugEnabled() && indexLogOut!=null) { > writer.setInfoStream(indexLogOut); > } > } catch (LockObtainFailedException lobfe) { > logger.debug("write lock on index "+friendlyName+". > will reopen in 50ms."); > try { Thread.sleep(50); } catch (Exception e) {} > attempt++; > writelock = true; > } catch (CorruptIndexException cie) { > throw new MessageSearchException("index "+friendlyName+" > appears to be corrupt. please reindex the active > volume."+cie.getMessage(),logger); > } catch (Throwable io) { > throw new MessageSearchException("failed to write document > to index "+friendlyName+":"+io.getMessage(),logger); > } > } while (writelock && attempt<maxattempt); > if (attempt>=10000) > throw new MessageSearchException("failed to open index > "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger); > } > public void indexDocument(LuceneDocument luceneDocument) throws > MessageSearchException { > logger.debug("index document {"+luceneDocument+"}"); > long s = (new Date()).getTime(); > if (luceneDocument == null) > throw new MessageSearchException("assertion failure: null > document",logger); > try { > queue.put(luceneDocument); > } catch (InterruptedException ie) { > throw new MessageSearchException("failed to add document to > queue:"+ie.getMessage(),ie,logger); > } > logger.debug("document indexed successfully > {"+luceneDocument+"}"); > logger.debug("indexing message end > {"+luceneDocument+"}"); > long e = (new Date()).getTime(); > logger.debug("indexing time {time='"+(e-s)+"'}"); > } > public class IndexProcessor extends Thread { > public IndexProcessor() { > setName("index processor"); > } > public void run() { > boolean exit = false; > //ExecutorService documentPool; > // we abandoned pool as it does not seem to offer any major > performance benefit > LuceneDocument luceneDocument = null; > LinkedList<LuceneDocument> pushbacks = new LinkedList<LuceneDocument>(); > while (!exit) { > try { > //documentPool = > Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); > luceneDocument = null; > luceneDocument = (LuceneDocument) queue.take(); > indexLock.lock(); > if (luceneDocument==EXIT_REQ) { > logger.debug("index exit req received. exiting"); > exit = true; > continue; > } > try { > openIndex(); > } catch (Exception e) { > logger.error("failed to open > index:"+e.getMessage(),e); > return; > } > if (luceneDocument==null) { > logger.debug("index info is null"); > } > int i = 0; > while(luceneDocument!=null && i<maxSimultaneousDocs) { > try { > Document doc = luceneDocument.getDocument(); > String language = doc.get("lang"); > if (language==null) { > language = > Config.getConfig().getIndex().getIndexLanguage(); > } > > writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX)); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(luceneDocument); > break; > } > //documentPool.execute(new > IndexDocument(luceneDocument,pushbacks)); > i++; > if (i<maxSimultaneousDocs) { > luceneDocument = (LuceneDocument) > queue.poll(); > if > (luceneDocument==null) { > logger.debug("index info is null"); > } > if > (luceneDocument==EXIT_REQ) { > logger.debug("index exit req > received. exiting (2)"); > exit = true; > break; > } > } > } > if (pushbacks.size()>0) { > closeIndex(); > try { > openIndex(); > } catch (Exception e) { > logger.error("failed to open > index:"+e.getMessage(),e); > return; > } > for (LuceneDocument pushback : pushbacks) { > try { > > writer.addDocument(pushback.getDocument()); > } catch (IOException io) { > logger.error("failed to add document > to index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(pushback); > } > //documentPool.execute(new > IndexDocument(pushback,pushbacks)); > i++; > } > } > //documentPool.shutdown(); > //documentPool.awaitTermination(30,TimeUnit.MINUTES); > } catch (Throwable ie) { > logger.error("index write > interrupted:"+ie.getMessage()); > } finally { > closeIndex(); > indexLock.unlock(); > } > } } > public class IndexDocument extends Thread { > LuceneDocument luceneDocument = null; > List<LuceneDocument> pushbacks = null; > public IndexDocument(LuceneDocument > luceneDocument,List<LuceneDocument> pushbacks) { > this.luceneDocument = luceneDocument; > this.pushbacks = pushbacks; > setName("index document"); > } > public void run() { > try { > writer.addDocument(luceneDocument.getDocument()); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(luceneDocument); > } catch (Throwable t) { > logger.error("failed to add document to > index:"+t.getMessage(),t); > } > }}; > } > protected void closeIndex() { > try { > indexLock.lock(); > if (writer!=null) { > writer.close(); > } > } catch (Throwable io) { > logger.error("failed to close index > writer:"+io.getMessage(),io); > } finally { > logger.debug("writer closed"); > writer = null; > indexLock.unlock(); > } > } > public void deleteIndex() throws MessageSearchException { > logger.debug("delete index {indexpath='"+indexPath+"'}"); > try { > indexLock.lock(); > try { > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new > IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new > IndexWriter.MaxFieldLength(maxIndexChars)); > } catch (Throwable cie) { > logger.error("failed to delete index > {index='"+indexPath+"'}",cie); > return; > } > MessageIndex.volumeIndexes.remove(this); > } finally { > closeIndex(); > indexLock.unlock(); > } > } > public void startup() { > logger.debug("volumeindex is starting up"); > File lockFile = new File(indexPath+File.separatorChar + > "write.lock"); > if (lockFile.exists()) { > logger.warn("The server lock file already exists. Either > another indexer is running or the server was not shutdown correctly."); > logger.warn("If it is the latter, the lock file must be > manually deleted at "+lockFile.getAbsolutePath()); > if (Config.getConfig().getIndex().getMultipleIndexProcesses()) > { > logger.debug("index lock file detected on volumeindex > startup."); > } else { > logger.warn("index lock file detected. the server was > shutdown incorrectly. automatically deleting lock file."); > logger.warn("indexer is configured to deal with only one > indexer process."); > logger.warn("if you are running more than one indexer, > your index could be subject to corruption."); > lockFile.delete(); > } > } > indexProcessor = new IndexProcessor(); > indexProcessor.start(); > Runtime.getRuntime().addShutdownHook(this); > } > public void shutdown() { > logger.debug("volumeindex is shutting down"); > queue.add(EXIT_REQ); > scheduler.shutdownNow(); > } > @Override > public void run() { > queue.add(EXIT_REQ); > } > public interface LuceneDocument { > public String toString(); > public Document getDocument(); > public void finalize(); > } > } > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > For additional commands, e-mail: java-user-h...@lucene.apache.org > > -- -