mturk 2002/06/29 00:48:20 Modified: jk/native2/common jk_worker_lb.c Log: Serialize the calls to the workers until the first call finishes or times out. Added new config param 'timeout' setting the initialization worker timeout. Revision Changes Path 1.21 +88 -7 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c Index: jk_worker_lb.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v retrieving revision 1.20 retrieving revision 1.21 diff -u -r1.20 -r1.21 --- jk_worker_lb.c 22 Jun 2002 16:57:54 -0000 1.20 +++ jk_worker_lb.c 29 Jun 2002 07:48:20 -0000 1.21 @@ -70,6 +70,10 @@ #include "jk_env.h" #include "jk_requtil.h" +#ifdef HAS_APR +#include "apr_thread_proc.h" +#endif + #define DEFAULT_LB_FACTOR (1.0) /* Time to wait before retry... XXX make it configurable*/ @@ -79,6 +83,12 @@ #define NO_WORKER_MSG "The servlet container is temporary unavailable or being upgraded\n"; +typedef struct { + struct jk_mutex *cs; + int init_timeout; + int initializing; +} jk_worker_lb_private_t; + /** Find the best worker. In process, check if timeout expired for workers that failed in the past and give them another chance. @@ -281,6 +291,8 @@ { int attempt=0; jk_workerEnv_t *wEnv=lb->workerEnv; + jk_worker_lb_private_t *lb_priv = lb->worker_private; + jk_worker_t *rec = NULL; if( s==NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, @@ -309,10 +321,29 @@ } while(1) { - jk_worker_t *rec; int rc; - rec=jk2_get_most_suitable_worker(env, lb, s, attempt); + /* Since there is potential worker state change + * make the find best worker call thread safe + */ + if (!lb_priv->initializing) { + if (lb_priv->cs != NULL) + lb_priv->cs->lock(env, lb_priv->cs); + rec=jk2_get_most_suitable_worker(env, lb, s, attempt); + + if (lb_priv->cs != NULL) + lb_priv->cs->unLock(env, lb_priv->cs); + } + else if (!rec){ + /* If we are initializing the service wait until + * the initialization finishes or times out + */ + if (lb->cs != NULL) { + lb->cs->lock(env, lb->cs); + lb->cs->unLock(env, lb->cs); + } + continue; + } attempt++; s->is_recoverable_error = JK_FALSE; @@ -354,16 +385,45 @@ s->jvm_route = rec->route; s->realWorker = rec; - + if (!rec->initialized && !lb_priv->initializing && lb->cs != NULL) { + /* If the worker has not been called yet serialize the call */ + lb->cs->lock(env, lb->cs); + lb_priv->initializing = JK_TRUE; + rec->error_time = time(NULL); + } rc = rec->service(env, rec, s); if(rc==JK_OK) { rec->in_error_state = JK_FALSE; rec->error_time = 0; - /* the endpoint that succeeded is saved for done() */ + /* Set the initialized flag to TRUE if that was the first call */ + if (!rec->initialized && lb->cs != NULL) { + rec->initialized = JK_TRUE; + lb_priv->initializing = JK_FALSE; + lb->cs->unLock(env, lb->cs); + } return JK_OK; } + if (!rec->initialized && lb->cs != NULL) { + time_t now = time(NULL); + /* In the case of initialization timeout disable the worker */ + if ((int)(now - rec->error_time) > lb_priv->init_timeout) { + rec->mbean->disabled = JK_TRUE; + lb_priv->initializing = JK_FALSE; + s->is_recoverable_error = JK_FALSE; + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "lb_worker.service() worker init timeout for %s\n", + rec->channelName); + lb->cs->unLock(env, lb->cs); + } + else { +#ifdef HAS_APR + apr_thread_yield(); +#endif + continue; + } + } env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() worker failed %s\n", rec->mbean->name ); @@ -373,7 +433,7 @@ * Time for fault tolerance (if possible)... */ rec->in_error_state = JK_TRUE; - rec->error_time = time(0); + rec->error_time = time(NULL); if(!s->is_recoverable_error) { /* Error is not recoverable - break with an error. */ @@ -464,6 +524,9 @@ lb->noWorkerCode=atoi( value ); } else if( strcmp( name, "hwBalanceErr") == 0 ) { lb->hwBalanceErr=atoi( value ); + } else if( strcmp( name, "initTimeout") == 0 ) { + jk_worker_lb_private_t *priv = lb->worker_private; + priv->init_timeout=atoi( value ); } return JK_ERR; @@ -506,7 +569,9 @@ { jk_worker_t *w; int i; - + jk_bean_t *jkb; + jk_worker_lb_private_t *worker_private; + if(NULL == name ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.factory() NullPointerException\n"); @@ -520,8 +585,24 @@ "lb_worker.factory() OutOfMemoryException\n"); return JK_ERR; } + + jkb=env->createBean2(env, pool,"threadMutex", NULL); + if( jkb != NULL ) { + w->cs=jkb->object; + jkb->init(env, jkb ); + } - w->worker_private = NULL; + worker_private = (jk_worker_lb_private_t *)pool->calloc(env, + pool, sizeof(jk_worker_lb_private_t)); + + /* one minute service startup */ + worker_private->init_timeout = 60; + jkb=env->createBean2(env, pool,"threadMutex", NULL); + if( jkb != NULL ) { + worker_private->cs=jkb->object; + jkb->init(env, jkb ); + } + w->worker_private = worker_private; w->service = jk2_lb_service; for( i=0; i<JK_LB_LEVELS; i++ ) {
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>