Hi There
Our app spends alot of time waiting for Lucene to finish writing to the
index. I'd like to minimize this. If you have a moment to spare, please
let me know if my LuceneIndex class presented below can be improved upon.
It is used in the following way:
luceneIndex = new
LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
exitReq,volume.getID()+"
indexer",volume.getIndexPath(),
Config.getConfig().getIndex().getMaxSimultaneousDocs());
Document doc = new Document();
IndexInfo indexInfo = new IndexInfo(doc);
luceneIndex.indexDocument(indexInfo);
As an aside note, is there any way for Lucene to support simultaneous
writes to an index? For example, each write threads could write to a
separate shard, after a period the shared could be merged into a single
index? Or is this overkill? I am interested hear the opinion of the
Lucene experts.
Thanks in advance
Jamie
package com.stimulus.archiva.index;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;
public class LuceneIndex extends Thread {
protected ArrayBlockingQueue<LuceneDocument> queue;
protected static final Log logger =
LogFactory.getLog(LuceneIndex.class.getName());
protected static final Log indexLog =
LogFactory.getLog("indexlog");
IndexWriter writer = null;
protected static ScheduledExecutorService scheduler;
protected static ScheduledFuture<?> scheduledTask;
protected LuceneDocument EXIT_REQ = null;
ReentrantLock indexLock = new ReentrantLock();
ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
File indexLogFile;
PrintStream indexLogOut;
IndexProcessor indexProcessor;
String friendlyName;
String indexPath;
int maxSimultaneousDocs;
public LuceneIndex(int queueSize, LuceneDocument exitReq,
String friendlyName, String indexPath,
int maxSimultaneousDocs) {
this.queue = new
ArrayBlockingQueue<LuceneDocument>(queueSize);
this.EXIT_REQ = exitReq;
this.friendlyName = friendlyName;
this.indexPath = indexPath;
this.maxSimultaneousDocs = maxSimultaneousDocs;
setLog(friendlyName);
}
public int getMaxSimultaneousDocs() {
return maxSimultaneousDocs;
}
public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
this.maxSimultaneousDocs = maxSimultaneousDocs;
}
public ReentrantLock getIndexLock() {
return indexLock;
}
protected void setLog(String logName) {
try {
indexLogFile = getIndexLogFile(logName);
if (indexLogFile!=null) {
if (indexLogFile.length()>10485760)
indexLogFile.delete();
indexLogOut = new PrintStream(indexLogFile);
}
logger.debug("set index log file path
{path='"+indexLogFile.getCanonicalPath()+"'}");
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
}
}
protected File getIndexLogFile(String logName) {
try {
String logfilepath =
Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
return new File(logfilepath);
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
return null;
}
}
protected void openIndex() throws MessageSearchException {
Exception lastError = null;
if (writer==null) {
logger.debug("openIndex() index "+friendlyName+" will be
opened. it is currently closed.");
} else {
logger.debug("openIndex() did not bother opening index
"+friendlyName+". it is already open.");
return;
}
logger.debug("opening index "+friendlyName+" for write");
logger.debug("opening search index "+friendlyName+" for
write {indexpath='"+indexPath+"'}");
boolean writelock;
int attempt = 0;
int maxattempt = 10;
if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
maxattempt = 10000;
} else {
maxattempt = 10;
}
do {
writelock = false;
try {
FSDirectory fsDirectory =
FSDirectory.getDirectory(indexPath);
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(fsDirectory,analyzer,new
IndexWriter.MaxFieldLength(maxIndexChars));
if (indexLog.isDebugEnabled() &&
indexLogOut!=null) {
writer.setInfoStream(indexLogOut);
}
} catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index
"+friendlyName+". will reopen in 50ms.");
try { Thread.sleep(50); } catch (Exception e) {}
attempt++;
writelock = true;
} catch (CorruptIndexException cie) {
throw new MessageSearchException("index
"+friendlyName+" appears to be corrupt. please reindex the active
volume."+cie.getMessage(),logger);
} catch (Throwable io) {
throw new MessageSearchException("failed to write
document to index "+friendlyName+":"+io.getMessage(),logger);
}
} while (writelock && attempt<maxattempt);
if (attempt>=10000)
throw new MessageSearchException("failed to open index
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
}
public void indexDocument(LuceneDocument luceneDocument) throws
MessageSearchException {
logger.debug("index document {"+luceneDocument+"}");
long s = (new Date()).getTime();
if (luceneDocument == null)
throw new MessageSearchException("assertion failure:
null document",logger);
try {
queue.put(luceneDocument);
} catch (InterruptedException ie) {
throw new MessageSearchException("failed to add document
to queue:"+ie.getMessage(),ie,logger);
}
logger.debug("document indexed successfully
{"+luceneDocument+"}");
logger.debug("indexing message end {"+luceneDocument+"}");
long e = (new Date()).getTime();
logger.debug("indexing time {time='"+(e-s)+"'}");
}
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
//ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any
major performance benefit
LuceneDocument luceneDocument = null;
LinkedList<LuceneDocument> pushbacks = new
LinkedList<LuceneDocument>();
while (!exit) {
try {
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
luceneDocument = null;
luceneDocument = (LuceneDocument) queue.take();
indexLock.lock();
if (luceneDocument==EXIT_REQ) {
logger.debug("index exit req received.
exiting");
exit = true;
continue;
}
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
if (luceneDocument==null) {
logger.debug("index info is null");
}
int i = 0;
while(luceneDocument!=null &&
i<maxSimultaneousDocs) {
try {
Document doc = luceneDocument.getDocument();
String language = doc.get("lang");
if (language==null) {
language =
Config.getConfig().getIndex().getIndexLanguage();
}
writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(luceneDocument);
break;
}
//documentPool.execute(new
IndexDocument(luceneDocument,pushbacks));
i++;
if (i<maxSimultaneousDocs) {
luceneDocument = (LuceneDocument)
queue.poll();
if (luceneDocument==null) {
logger.debug("index info is null");
}
if (luceneDocument==EXIT_REQ) {
logger.debug("index exit req
received. exiting (2)");
exit = true;
break;
}
}
}
if (pushbacks.size()>0) {
closeIndex();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
for (LuceneDocument pushback : pushbacks) {
try {
writer.addDocument(pushback.getDocument());
} catch (IOException io) {
logger.error("failed to add
document to index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(pushback);
}
//documentPool.execute(new
IndexDocument(pushback,pushbacks));
i++;
}
}
//documentPool.shutdown();
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage());
} finally {
closeIndex();
indexLock.unlock();
}
}
}
public class IndexDocument extends Thread {
LuceneDocument luceneDocument = null;
List<LuceneDocument> pushbacks = null;
public IndexDocument(LuceneDocument
luceneDocument,List<LuceneDocument> pushbacks) {
this.luceneDocument = luceneDocument;
this.pushbacks = pushbacks;
setName("index document");
}
public void run() {
try {
writer.addDocument(luceneDocument.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(luceneDocument);
} catch (Throwable t) {
logger.error("failed to add document to
index:"+t.getMessage(),t);
}
}};
}
protected void closeIndex() {
try {
indexLock.lock();
if (writer!=null) {
writer.close();
}
} catch (Throwable io) {
logger.error("failed to close index
writer:"+io.getMessage(),io);
} finally {
logger.debug("writer closed");
writer = null;
indexLock.unlock();
}
}
public void deleteIndex() throws MessageSearchException {
logger.debug("delete index {indexpath='"+indexPath+"'}");
try {
indexLock.lock();
try {
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
IndexWriter.MaxFieldLength(maxIndexChars));
} catch (Throwable cie) {
logger.error("failed to delete index
{index='"+indexPath+"'}",cie);
return;
}
MessageIndex.volumeIndexes.remove(this);
} finally {
closeIndex();
indexLock.unlock();
}
}
public void startup() {
logger.debug("volumeindex is starting up");
File lockFile = new File(indexPath+File.separatorChar +
"write.lock");
if (lockFile.exists()) {
logger.warn("The server lock file already exists. Either
another indexer is running or the server was not shutdown correctly.");
logger.warn("If it is the latter, the lock file must be
manually deleted at "+lockFile.getAbsolutePath());
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
logger.debug("index lock file detected on
volumeindex startup.");
} else {
logger.warn("index lock file detected. the server
was shutdown incorrectly. automatically deleting lock file.");
logger.warn("indexer is configured to deal with only
one indexer process.");
logger.warn("if you are running more than one
indexer, your index could be subject to corruption.");
lockFile.delete();
}
}
indexProcessor = new IndexProcessor();
indexProcessor.start();
Runtime.getRuntime().addShutdownHook(this);
}
public void shutdown() {
logger.debug("volumeindex is shutting down");
queue.add(EXIT_REQ);
scheduler.shutdownNow();
}
@Override
public void run() {
queue.add(EXIT_REQ);
}
public interface LuceneDocument {
public String toString();
public Document getDocument();
public void finalize();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org