Jamie did you ever get to the bottom of this? Can you reduce this code down to a smaller example that shows the hang?
Also, can you post a thread stack dump when you hit the hang? Is it possible you are adding documents from one thread while calling IndexWriter.close in another? I see you have "catch AlreadyClosedException" when adding docs -- do you ever actually hit that? Mike On Fri, Oct 9, 2009 at 4:47 AM, Jamie Band <ja...@stimulussoft.com> wrote: > Hi Michael > > Thanks for your help. Here are the stacks: > > index processor [TIME_WAITING] CPU time: 33:01 > java.lang.Object.wait(long) > org.apache.lucene.index.IndexWriter.doWait() > org.apache.lucene.index.IndexWriter.shouldClose() > org.apache.lucene.index.IndexWriter.close(boolean) > org.apache.lucene.index.IndexWriter.close() > com.stimulus.archiva.index.VolumeIndex.closeIndex() > com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run() > > The source code to our indexer is attached. As you can see, documents are > added to a blocking queue. The index processor thread takes it out of the > queue and processes it. After about 60k documents IndexWriter's close method > enters TIME_WAITING indefinitely. It there any workaround to this problem? > > > package com.stimulus.archiva.index; > > import java.io.File; > import java.io.IOException; > import java.io.PrintStream; > import javax.mail.MessagingException; > import org.apache.commons.logging.*; > import org.apache.lucene.document.Document; > import org.apache.lucene.index.*; > import org.apache.lucene.store.FSDirectory; > import com.stimulus.archiva.domain.Config; > import com.stimulus.archiva.domain.Email; > import com.stimulus.archiva.domain.EmailID; > import com.stimulus.archiva.domain.Indexer; > import com.stimulus.archiva.domain.Volume; > import com.stimulus.archiva.exception.*; > import com.stimulus.archiva.language.AnalyzerFactory; > import com.stimulus.archiva.search.*; > import java.util.*; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.ScheduledExecutorService; > import java.util.concurrent.ScheduledFuture; > import java.util.concurrent.TimeUnit; > import org.apache.lucene.store.LockObtainFailedException; > import org.apache.lucene.store.AlreadyClosedException; > import java.util.concurrent.locks.ReentrantLock; > import java.util.concurrent.*; > > public class VolumeIndex extends Thread { > protected ArrayBlockingQueue<IndexInfo> queue; > protected static final Log logger = > LogFactory.getLog(VolumeIndex.class.getName()); > IndexWriter writer = null; > Volume volume; > protected static ScheduledExecutorService scheduler; > protected static ScheduledFuture<?> scheduledTask; > protected static IndexInfo EXIT_REQ = new IndexInfo(null); > ReentrantLock indexLock = new ReentrantLock(); > ArchivaAnalyzer analyzer = new ArchivaAnalyzer(); > Indexer indexer = null; > File indexLogFile; > PrintStream indexLogOut; > IndexProcessor indexProcessor; > public VolumeIndex(Indexer indexer, Volume volume) { > logger.debug("creating new volume index {"+volume+"}"); > this.volume = volume; > this.indexer = indexer; > this.queue = new > ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog()); > try { > indexLogFile = getIndexLogFile(volume); > if (indexLogFile!=null) { > if (indexLogFile.length()>10485760) > indexLogFile.delete(); > indexLogOut = new PrintStream(indexLogFile); > } > logger.debug("set index log file path > {path='"+indexLogFile.getCanonicalPath()+"'}"); > } catch (Exception e) { > logger.error("failed to open index log > file:"+e.getMessage(),e); > } > startup(); > } > protected File getIndexLogFile(Volume volume) { > try { > String indexpath = volume.getIndexPath(); > int lio = indexpath.lastIndexOf(File.separator)+1; > String logfilepath = > indexpath.substring(lio,indexpath.length()-1); > logfilepath += ".log"; > logfilepath = "index_"+logfilepath; > logfilepath = > Config.getFileSystem().getLogPath()+File.separator+logfilepath; > return new File(logfilepath); > } catch (Exception e) { > logger.error("failed to open index log > file:"+e.getMessage(),e); > return null; > } > } > public void deleteMessages(List<String> ids) throws > MessageSearchException { > if (ids == null) > throw new MessageSearchException("assertion failure: null > ids",logger); > Term[] terms = new Term[ids.size()]; > int c = 0; > StringBuffer deleteInfo = new StringBuffer(); > for (String id : ids) { > terms[c++] = new Term("uid",id); > deleteInfo.append(id); > deleteInfo.append(","); > } > String deleteStr = deleteInfo.toString(); > if (deleteStr.length()>0 && > deleteStr.charAt(deleteStr.length()-1)==',') > deleteStr = deleteStr.substring(0,deleteStr.length()-1); > logger.debug("delete messages {'"+deleteInfo+"'}"); > try { > indexLock.lock(); > openIndex(); > try { > writer.deleteDocuments(terms); > writer.expungeDeletes(); > } catch (Exception e) { > throw new MessageSearchException("failed to delete email > from index.",e,logger); > } finally { > } > } finally { > closeIndex(); > indexLock.unlock(); > } > } > protected void openIndex() throws MessageSearchException { > Exception lastError = null; > if (writer==null) { > logger.debug("openIndex() index will be opened. it is > currently closed."); > } else { > logger.debug("openIndex() did not bother opening index. it is > already open."); > return; > } > logger.debug("opening index for write {"+volume+"}"); > indexer.prepareIndex(volume); > logger.debug("opening search index for write > {indexpath='"+volume.getIndexPath()+"'}"); > boolean writelock; > int attempt = 0; > int maxattempt = 10; > if > (Config.getConfig().getIndex().getMultipleIndexProcesses()) { > maxattempt = 10000; > } else { > maxattempt = 10; > } > do { > writelock = false; > try { > FSDirectory fsDirectory = > FSDirectory.getDirectory(volume.getIndexPath()); > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new IndexWriter(fsDirectory,analyzer,new > IndexWriter.MaxFieldLength(maxIndexChars)); > if (logger.isDebugEnabled() && indexLogOut!=null) { > writer.setInfoStream(indexLogOut); > } > } catch (LockObtainFailedException lobfe) { > logger.debug("write lock on index. will reopen in > 50ms."); > try { Thread.sleep(50); } catch (Exception e) {} > attempt++; > writelock = true; > } catch (CorruptIndexException cie) { > throw new MessageSearchException("index appears to be > corrupt. please reindex the active volume."+cie.getMessage(),logger); > } catch (IOException io) { > throw new MessageSearchException("failed to write document > to index:"+io.getMessage(),logger); > } > } while (writelock && attempt<maxattempt); > if (attempt>=10000) > throw new MessageSearchException("failed to open index writer > {location='"+volume.getIndexPath()+"'}",lastError,logger); > } > public void indexMessage(Email message) throws > MessageSearchException { > logger.debug("index message {"+message+"}"); > long s = (new Date()).getTime(); > if (message == null) > throw new MessageSearchException("assertion failure: null > message",logger); > Document doc = new Document(); > IndexInfo indexInfo = new IndexInfo(doc); > try { > DocumentIndex docIndex = new DocumentIndex(indexer); > String language = doc.get("lang"); > if (language==null) > language = indexer.getIndexLanguage(); > docIndex.write(message,doc,indexInfo); > queue.put(indexInfo); > logger.debug("message indexed successfully > {"+message+",language='"+language+"'}"); > } catch (MessagingException me) { > throw new MessageSearchException("failed to decode message > during indexing",me,logger, ChainedException.Level.DEBUG); > } catch (IOException me) { > throw new MessageSearchException("failed to index > message"+me.getMessage()+" {"+message+"}",me,logger, > ChainedException.Level.DEBUG); > } catch (ExtractionException ee) > { > // we will want to continue indexing > //throw new MessageSearchException("failed to decode > attachments in message {"+message+"}",ee,logger, > ChainedException.Level.DEBUG); > } catch (AlreadyClosedException ace) { > indexMessage(message); > } catch (Throwable e) { > throw new MessageSearchException("failed to index > message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG); > } > logger.debug("indexing message end {"+message+"}"); > long e = (new Date()).getTime(); > logger.debug("indexing time {time='"+(e-s)+"'}"); > } > public class IndexProcessor extends Thread { > public IndexProcessor() { > setName("index processor"); > } > public void run() { > boolean exit = false; > //ExecutorService documentPool; > // we abandoned pool as it does not seem to offer any major > performance benefit > IndexInfo indexInfo = null; > LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>(); > while (!exit) { > try { > int maxIndexDocs = > Config.getConfig().getIndex().getMaxSimultaneousDocs(); > //documentPool = > Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); > indexInfo = null; indexInfo = > (IndexInfo) queue.take(); > if (indexInfo==EXIT_REQ) { > logger.debug("index exit req received. exiting"); > exit = true; > continue; > } > indexLock.lock(); > try { > openIndex(); > } catch (Exception e) { > logger.error("failed to open > index:"+e.getMessage(),e); > return; > } > if (indexInfo==null) { > logger.debug("index info is null"); > } > int i = 0; > while(indexInfo!=null && i<maxIndexDocs) { > try { > writer.addDocument(indexInfo.getDocument()); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(indexInfo); > } finally { > indexInfo.cleanup(); > } > > //documentPool.execute(new IndexDocument(indexInfo,pushbacks)); > i++; > if (i<maxIndexDocs) { > indexInfo = (IndexInfo) queue.poll(); > if > (indexInfo==null) { > logger.debug("index info is null"); > } > if > (indexInfo==EXIT_REQ) { > logger.debug("index exit req > received. exiting (2)"); > exit = true; > break; > } > } > } > for (IndexInfo pushback : > pushbacks) { > try { > writer.addDocument(pushback.getDocument()); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(indexInfo); > } finally { > indexInfo.cleanup(); > } > //documentPool.execute(new > IndexDocument(pushback,pushbacks)); > i++; > } > //documentPool.shutdown(); > //documentPool.awaitTermination(30,TimeUnit.MINUTES); > } catch (Throwable ie) { > logger.error("index write > interrupted:"+ie.getMessage()); > } finally { > closeIndex(); > indexLock.unlock(); > } > } } > public class IndexDocument extends Thread { > IndexInfo indexInfo = null; > List<IndexInfo> pushbacks = null; > public IndexDocument(IndexInfo > indexInfo,List<IndexInfo> pushbacks) { > this.indexInfo = indexInfo; > this.pushbacks = pushbacks; > setName("index document"); > } > public void run() { > try { > writer.addDocument(indexInfo.getDocument()); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(indexInfo); > } > }}; > } > protected void closeIndex() { > try { > if (writer!=null) { > writer.close(); > logger.debug("writer closed"); > writer = null; > } > } catch (Exception io) { > logger.error("failed to close index > writer:"+io.getMessage(),io); > } > } > public void deleteIndex() throws MessageSearchException { > logger.debug("delete index > {indexpath='"+volume.getIndexPath()+"'}"); > try { > indexLock.lock(); > try { > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new > IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new > IndexWriter.MaxFieldLength(maxIndexChars)); > } catch (Exception cie) { > logger.error("failed to delete index > {index='"+volume.getIndexPath()+"'}",cie); > return; > } > MessageIndex.volumeIndexes.remove(this); > } finally { > closeIndex(); > indexLock.unlock(); > } > } > public void startup() { > logger.debug("volumeindex is starting up"); > File lockFile = new File(volume.getIndexPath()+File.separatorChar > + "write.lock"); > if (lockFile.exists()) { > logger.warn("The server lock file already exists. Either > another indexer is running or the server was not shutdown correctly."); > logger.warn("If it is the latter, the lock file must be > manually deleted at "+lockFile.getAbsolutePath()); > if (indexer.getMultipleIndexProcesses()) { > logger.debug("index lock file detected on volumeindex > startup."); > } else { > logger.warn("index lock file detected. the server was > shutdown incorrectly. automatically deleting lock file."); > logger.warn("indexer is configured to deal with only one > indexer process."); > logger.warn("if you are running more than one indexer, > your index could be subject to corruption."); > lockFile.delete(); > } > } > indexProcessor = new IndexProcessor(); > indexProcessor.start(); > Runtime.getRuntime().addShutdownHook(this); > } > public void shutdown() { > logger.debug("volumeindex is shutting down"); > queue.add(EXIT_REQ); > scheduler.shutdownNow(); > } > @Override > public void run() { > queue.add(EXIT_REQ); > } > } > > > > > Is it possible a large merge is running? By default IW.close waits > for outstanding merges to complete. Can you post the stacktrace? > > Mike > > On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com> wrote: >> >> Hi All >> >> I have a long running situation where our indexing thread is getting stuck >> indefinitely in IndexWriter's close method. Yourkit shows the thread to be >> stuck in TIME_WAITING. Any idea's on what could be causing this? >> Could it be one of the streams or readers we passed to the document? >> >> I am running Lucene 2.9.0. >> >> Many thanks in advance >> >> Jamie >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org >> For additional commands, e-mail: java-user-h...@lucene.apache.org >> >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > For additional commands, e-mail: java-user-h...@lucene.apache.org > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > For additional commands, e-mail: java-user-h...@lucene.apache.org > > --------------------------------------------------------------------- To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org For additional commands, e-mail: java-user-h...@lucene.apache.org