costin 01/12/12 13:36:17 Modified: jk/native2/common jk_workerEnv.c Log: Add logic to look for all handlers and create the dispatch table. Moved the main dispatcher logic from ajp_worker/ajp_endpoint - it is a generic mechanism that can be used by any worker ( of course, not required, like everything else ) Revision Changes Path 1.6 +206 -23 jakarta-tomcat-connectors/jk/native2/common/jk_workerEnv.c Index: jk_workerEnv.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_workerEnv.c,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- jk_workerEnv.c 2001/12/06 23:00:44 1.5 +++ jk_workerEnv.c 2001/12/12 21:36:17 1.6 @@ -59,7 +59,7 @@ * Description: Workers controller * * Author: Gal Shachor <[EMAIL PROTECTED]> * * Author: Henri Gomez <[EMAIL PROTECTED]> * - * Version: $Revision: 1.5 $ * + * Version: $Revision: 1.6 $ * ***************************************************************************/ #include "jk_workerEnv.h" @@ -111,23 +111,16 @@ /* Ignore it, other workers may be ok. return JK_FALSE; */ } else { - map_put(_this->worker_map, worker_list[i], w, (void *)&oldw); - - if(oldw!=NULL) { - l->jkLog(_this->l, JK_LOG_DEBUG, - "build_worker_map, removing old %s worker \n", - worker_list[i]); - oldw->destroy(&oldw, _this->l); - } if( _this->defaultWorker == NULL ) _this->defaultWorker=w; } } - - l->jkLog(_this->l, JK_LOG_DEBUG, "build_worker_map, done\n"); - l->jkLog(_this->l, JK_LOG_DEBUG, - "workerEnv.init() done: %d %s\n", _this->num_of_workers, worker_list[0]); + jk_workerEnv_initHandlers( _this ); + + l->jkLog(_this->l, JK_LOG_INFO, + "workerEnv.init() %d workers, default %s\n", + _this->num_of_workers, worker_list[0]); return JK_TRUE; } @@ -164,15 +157,17 @@ jk_logger_t *l=_this->l; if(!name) { - l->jkLog(l, JK_LOG_ERROR, "wc_get_worker_for_name NULL name\n"); + l->jkLog(l, JK_LOG_ERROR, + "workerEnv.getWorkerForName() NullPointerException\n"); return NULL; } rc = map_get(_this->worker_map, name, NULL); - if( rc==NULL ) { - l->jkLog(l, JK_LOG_ERROR, "getWorkerForName: no worker found for %s \n", name); - } + /* if( rc==NULL ) { */ + /* l->jkLog(l, JK_LOG_INFO, */ + /* "workerEnv.getWorkerForName(): no worker found for %s \n", name); */ + /* } */ return rc; } @@ -211,6 +206,175 @@ } +static void jk_workerEnv_checkSpace( jk_pool_t *pool, + void ***tableP, int *sizeP, int id ) +{ + void **newTable; + int i; + int newSize=id+4; + + if( *sizeP > id ) return; + /* resize the table */ + newTable=(void **)pool->calloc( pool, newSize * sizeof( void *)); + for( i=0; i<*sizeP; i++ ) { + newTable[i]= (*tableP)[i]; + } + *tableP=newTable; + *sizeP=newSize; +} + +static void jk_workerEnv_initHandlers(jk_workerEnv_t *_this) +{ + /* Find the max message id */ + /* XXX accessing private data... env most provide some method to get this */ + jk_map_t *registry=_this->env->_registry; + int size=map_size( registry ); + int i,j; + + for( i=0; i<size; i++ ) { + jk_handler_t *handler; + jk_map_t *localHandlers; + int rc; + + char *name=map_name_at( registry, i ); + if( strstr( name, "handler" ) == name ) { + char *type=name+ strlen( "handler" ) +1; + printf("XXX Found handler: %s %s\n", name, type ); + + localHandlers=(jk_map_t *)_this->env->getInstance(_this->env, + _this->pool, + "handler", + type ); + if( localHandlers==NULL ) continue; + + for( j=0; j< map_size( localHandlers ); j++ ) { + handler=(jk_handler_t *)map_value_at( localHandlers, j ); + jk_workerEnv_checkSpace( _this->pool, + (void ***)&_this->handlerTable, + &_this->lastMessageId, + handler->messageId ); + _this->handlerTable[ handler->messageId ]=handler; + _this->l->jkLog( _this->l, JK_LOG_INFO, "Registerd %s %d \n", + handler->name, handler->messageId); + } + } + } +} + + +/* + * Process incoming messages. + * + * We will know only at read time if the remote host closed + * the connection (half-closed state - FIN-WAIT2). In that case + * we must close our side of the socket and abort emission. + * We will need another connection to send the request + * There is need of refactoring here since we mix + * reply reception (tomcat -> apache) and request send (apache -> tomcat) + * and everything using the same buffer (repmsg) + * ajp13/ajp14 is async but handling read/send this way prevent nice recovery + * In fact if tomcat link is broken during upload (browser ->apache ->tomcat) + * we'll loose data and we'll have to abort the whole request. + */ +static int jk_workerEnv_processCallbacks(jk_workerEnv_t *_this, + jk_endpoint_t *e, + jk_ws_service_t *r ) +{ + int code; + jk_handler_t *handler; + int rc; + jk_handler_t **handlerTable=e->worker->workerEnv->handlerTable; + int maxHandler=e->worker->workerEnv->lastMessageId; + + /* Process reply - this is the main loop */ + /* Start read all reply message */ + while(1) { + rc=-1; + handler=NULL; + + _this->l->jkLog(_this->l, JK_LOG_INFO, + "ajp14.processCallbacks() Waiting reply\n"); + e->reply->reset(e->reply); + + rc= e->reply->receive( e->reply, e ); + if( rc!=JK_TRUE ) { + _this->l->jkLog(_this->l, JK_LOG_ERROR, + "ajp14.service() Error reading reply\n"); + /* we just can't recover, unset recover flag */ + return JK_FALSE; + } + + code = (int)e->reply->getByte(e->reply); + if( code < maxHandler ) { + handler=handlerTable[ code ]; + } + + if( handler==NULL ) { + _this->l->jkLog(_this->l, JK_LOG_ERROR, + "ajp14.processCallback() Invalid code: %d\n", code); + e->reply->dump(e->reply, _this->l, "Message: "); + return JK_FALSE; + } + + _this->l->jkLog(_this->l, JK_LOG_DEBUG, + "ajp14.dispath() Calling %d %s\n", handler->messageId, + handler->name); + + /* Call the message handler */ + rc=handler->callback( e->reply, r, e, _this->l ); + + /* Process the status code returned by handler */ + switch( rc ) { + case JK_HANDLER_LAST: + /* no more data to be sent, fine we have finish here */ + return JK_TRUE; + case JK_HANDLER_OK: + /* One-time request, continue to listen */ + break; + case JK_HANDLER_RESPONSE: + /* + * in upload-mode there is no second chance since + * we may have allready send part of uploaded data + * to Tomcat. + * In this case if Tomcat connection is broken we must + * abort request and indicate error. + * A possible work-around could be to store the uploaded + * data to file and replay for it + */ + e->recoverable = JK_FALSE; + rc = e->post->send(e->post, e ); + if (rc < 0) { + _this->l->jkLog(_this->l, JK_LOG_ERROR, + "ajp14.processCallbacks() error sending response data\n"); + return JK_FALSE; + } + break; + case JK_HANDLER_ERROR: + /* + * we won't be able to gracefully recover from this so + * set recoverable to false and get out. + */ + e->recoverable = JK_FALSE; + return JK_FALSE; + case JK_HANDLER_FATAL: + /* + * Client has stop talking to us, so get out. + * We assume this isn't our fault, so just a normal exit. + * In most (all?) cases, the ajp13_endpoint::reuse will still be + * false here, so this will be functionally the same as an + * un-recoverable error. We just won't log it as such. + */ + return JK_FALSE; + default: + /* Unknown status */ + /* return JK_FALSE; */ + } + } + return JK_FALSE; +} + + + static jk_worker_t *jk_workerEnv_createWorker(jk_workerEnv_t *_this, const char *name, @@ -221,8 +385,17 @@ jk_env_objectFactory_t fac; jk_logger_t *l=_this->l; jk_worker_t *w = NULL; + jk_worker_t *oldW = NULL; jk_pool_t *workerPool; + /* First find if it already exists */ + w=_this->getWorkerForName( _this, name ); + if( w != NULL ) { + l->jkLog(l, JK_LOG_INFO, + "workerEnv.createWorker(): Using existing worker %s\n",name); + return w; + } + workerPool=_this->pool->create(_this->pool, HUGE_POOL_SIZE); type=map_getStrProp( init_data,"worker",name,"type",NULL ); @@ -230,7 +403,8 @@ /* Each worker has it's own pool */ - w=(jk_worker_t *)_this->env->getInstance(_this->env, workerPool, "worker", type ); + w=(jk_worker_t *)_this->env->getInstance(_this->env, workerPool, "worker", + type ); if( w == NULL ) { l->jkLog(l, JK_LOG_ERROR, @@ -242,7 +416,7 @@ w->pool=workerPool; w->name=(char *)name; w->workerEnv=_this; - + err=w->validate(w, init_data, _this, l); if( err!=JK_TRUE ) { @@ -253,11 +427,8 @@ return NULL; } - l->jkLog(l, JK_LOG_DEBUG, - "workerEnv.createWorker(): validated %s:%s\n", - type, name); - err=w->init(w, init_data, _this, l); + if(err!=JK_TRUE) { w->destroy(&w, l); l->jkLog(l, JK_LOG_ERROR, "workerEnv.createWorker() init failed for %s\n", @@ -265,6 +436,17 @@ return NULL; } + l->jkLog(l, JK_LOG_INFO, + "workerEnv.createWorker(): validate and init %s:%s\n", type, name); + + map_put(_this->worker_map, name, w, (void *)&oldW); + + if(oldW!=NULL) { + l->jkLog(_this->l, JK_LOG_ERROR, "workerEnv.createWorker() duplicated %s worker \n", + name); + oldW->destroy(&oldW, _this->l); + } + return w; } @@ -361,6 +543,7 @@ _this->close=&jk_workerEnv_close; _this->createWorker=&jk_workerEnv_createWorker; _this->createWebapp=&jk_workerEnv_createWebapp; + _this->processCallbacks=&jk_workerEnv_processCallbacks; _this->rootWebapp=_this->createWebapp( _this, NULL, "/", NULL );
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>