Hello,
I'm developing a java web application for finances quotes'
real-time stream, for hundreds of concurrent users. The solution
involves a long http request serving javascript snippets that will
be executed by broser when received, changing the data shown to the
user.
To do this, I have a Comet servlet, ServletStream, that receives
the request, and registers a new instance of
JavascriptNotificationListener, a wrapper to the response.writer,
to a running worker thread, MarketNotifier, that, when a market
event occurs, fires the JavascriptNotificationListener that writes
the corresponding event javascript snippet to the response.writer.
There is the Flusher, a running thread that awakes every half
second and do a flush on every registered response.writer through
the same JavascriptNotificationListener. And there is a servlet,
StreamAction, that submits commands to the MarketNotifier, to
change the market event subscriptions. Some of these commands can
also fire market events on the JavascriptNotificationListener, and
are run while the ServletStream is sending data.
Every IO on the writer (print, println, flush, checkError) is
synchronized on a mutex Object holded by each instance of
JavascriptNotificationListener, that is, I have a mutex for each
response.writer.
When running this solution, I often use to get the following error:
java.lang.NullPointerException
at
org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607)
at
org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600)
at
org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.coyote.Response.sendHeaders(Response.java:379)
at
org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305)
at
org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at
org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
at
br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106)
at br.com.cedro.stream.Flusher.run(Flusher.java:42)
This error occurs even on low usage scenarios (five concurrent
users) almost every hour. After the first occurrence happens, every
other IO on all response.writers catches the same exception.
I'm using apache-tomcat-6.0.14, with default Nio Http Connector
configuration parameters on server.xml.
I think this problem has to do with synchronization for IO
operations. Am I right? Or must I simply check some status before
doing IO?
The Chat Comet tomcat example does synchronization on a single
mutex for all response.writers. Must I do the same? Why? Or can I
use higher-granularity mutexes like I did? If so, what is the right
object to synchronize? Response? Response.writer?
Has anyone developed Comet servlets and faced the same problem?
Here follow the relevant pieces of my source codes (I put some
comments about the problem to increase readability):
public class JavascriptNotificationListener ... {
// This is the only class that manipulates response.writer
...
private PrintWriter out; // set as response.writer on constructor
private Object writeMonitor = new Object(); // the mutex
public JavascriptNotificationListener(PrintWriter responseWriter){
this.out = responseWriter;
synchronized (writeMonitor) { // Althought at this time we
have just one thread here
out.print("<html><head><script language=\"javascript\">");
}
}
...
public void flush(){
// Called by the Flusher thread every 500 msec
synchronized (writeMonitor) {
out.print("</script>\n<script language=\"javascript\">");
// checkError internally calls flush
if(out.checkError()){
log.error("Error flushing event stream.");
}
}
}
...
public void syn(){
// Called by the Flusher thread every five seconds
synchronized (writeMonitor) {
out.print("s();");
if(out.checkError()){
log.error("Error flushing event stream.");
}
}
...
public void fire(MarketEvent e){
// Called by the worker thread
synchronized (writeMonitor) {
out.print(translate(e));
}
}
...
}
public class ServletStream ... {
// The comet stream servlet
...
private static final Flusher flusher = new Flusher(500); // Will
be started only once on Servlet.init
public void event(CometEvent event) throws IOException,
ServletException {
...
final HttpServletResponse response =
event.getHttpServletResponse();
final HttpServletResponse response =
event.getHttpServletResponse();
// A user can only have one session on our system
String username = (String)
request.getSession().getAttribute("username");
if(event.getEventType() == CometEvent.EventType.BEGIN){
if(!AuthStore.checkKey((String)
request.getSession().getAttribute("key"))){
// User unauthenticated
response.getWriter().println("...Permission
Denied..."); // just to simplify: I write an HTML instead
event.close();
} else {
response.setCharacterEncoding("UTF-8");
// This notify object is active and has a working
thread
MarketNotifier notify =
NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
// We wrap our writer inside an object that
understands javascript serialization
JavascriptNotificationListener jsNot = new
JavascriptNotificationListener(response.getWriter());
// Now we subscribe to receive the fire(MarketEvent)
notify.addNotificationListener(jsNot);
// And subscribe to be called on flush() and syn()
flusher.addFlushable(username, jsNot);
// LoadSubscribedEvents is a command to define the
initial listeners for specific market events
// Note that MarketNotifier.process(Command) can
call JavascriptNotificationListener.fire(MarketEvent),
// that is, here we can have another thread acessing
response.writer
// This method is also called by an action servlet
(see the following class)
notify.process(new LoadSubscribedEvents());
}
} else if (event.getEventType() ==
CometEvent.EventType.ERROR || event.getEventType() ==
CometEvent.EventType.END) {
// Stops receiving syn() and flush()
flusher.removeFlushable(username);
MarketNotifier notify =
NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
notify.process(new UnLoadSubscribedEvents());
NotifierManager.freeMarketNotifierForUser(username + ":"
+ sessionId);
event.close();
}
}
}
public class Flusher extends Thread {
// Our Flusher thread
private int msec; // time interval to flush, set on constructor
...
public void run(){
int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
int i = 0;
while(running.get()){
try {
startFlush = new Date();
synchronized (flushMonitor) { // This mutex is also
used on addFlushable and removeFlushable
++i;
for(Flushable f : flushables){
if(i % multiplier == 0){
f.syn();
} else {
f.flush();
}
}
}
try {
long timeToSleep = startFlush.getTime() + msec -
(new Date()).getTime();
if(timeToSleep > 0){
Thread.sleep(timeToSleep);
} else {
log.info("...Flush contention..."); // More
than that, indeed :)
}
} catch (InterruptedException e) {
// Ignore!
}
} catch (Throwable t) {
// Here we catch that exception!
log.error("Fail to flush.", t);
}
}
}
}
public class StreamAction ... {
// Our subscription command processor servlet, called while the
stream is served to change its content
...
public void service(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
String username = (String)
request.getSession().getAttribute("username");
MarketNotifier notify =
NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
... // Discovers the command to be sent from
request.parameters
if(commandName.equals("SubscribeQuoteEvents")){ //
Simplified to easy understanding
// This can call
JavascriptNotificationListener.fire(MarketEvent)
notify.process(new
SubscribeQuoteEvents(commandParameters));
}
...
}
}
Any ideas?
Thanks
Leonardo Fraga
Web Developer
Cedro Finances
------------------------------------------------------------------------
No virus found in this incoming message.
Checked by AVG Free Edition. Version: 7.5.503 / Virus Database:
269.16.2/1142 - Release Date:
11/20/2007 5:44 PM